You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/07/06 05:50:06 UTC
spark git commit: [SPARK-8549] [SPARKR] Fix the line length of SparkR
Repository: spark
Updated Branches:
refs/heads/master f9c448dce -> a0cb111b2
[SPARK-8549] [SPARKR] Fix the line length of SparkR
[[SPARK-8549] Fix the line length of SparkR - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-8549)
Author: Yu ISHIKAWA <yu...@gmail.com>
Closes #7204 from yu-iskw/SPARK-8549 and squashes the following commits:
6fb131a [Yu ISHIKAWA] Fix the typo
1737598 [Yu ISHIKAWA] [SPARK-8549][SparkR] Fix the line length of SparkR
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0cb111b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0cb111b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0cb111b
Branch: refs/heads/master
Commit: a0cb111b22cb093e86b0daeecb3dcc41d095df40
Parents: f9c448d
Author: Yu ISHIKAWA <yu...@gmail.com>
Authored: Sun Jul 5 20:50:02 2015 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Sun Jul 5 20:50:02 2015 -0700
----------------------------------------------------------------------
R/pkg/R/generics.R | 3 ++-
R/pkg/R/pairRDD.R | 12 ++++++------
R/pkg/R/sparkR.R | 9 ++++++---
R/pkg/R/utils.R | 31 ++++++++++++++++++-------------
R/pkg/inst/tests/test_includeJAR.R | 4 ++--
R/pkg/inst/tests/test_rdd.R | 12 ++++++++----
R/pkg/inst/tests/test_sparkSQL.R | 11 +++++++++--
7 files changed, 51 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a0cb111b/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 79055b7..fad9d71 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -20,7 +20,8 @@
# @rdname aggregateRDD
# @seealso reduce
# @export
-setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
+setGeneric("aggregateRDD",
+ function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
# @rdname cache-methods
# @export
http://git-wip-us.apache.org/repos/asf/spark/blob/a0cb111b/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 7f902ba..0f1179e 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -560,8 +560,8 @@ setMethod("join",
# Left outer join two RDDs
#
# @description
-# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V).
-# The key types of the two RDDs should be the same.
+# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
+# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
@@ -597,8 +597,8 @@ setMethod("leftOuterJoin",
# Right outer join two RDDs
#
# @description
-# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V).
-# The key types of the two RDDs should be the same.
+# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
+# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
@@ -634,8 +634,8 @@ setMethod("rightOuterJoin",
# Full outer join two RDDs
#
# @description
-# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V).
-# The key types of the two RDDs should be the same.
+# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
+# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
http://git-wip-us.apache.org/repos/asf/spark/blob/a0cb111b/R/pkg/R/sparkR.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 86233e0..048eb8e 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -105,7 +105,8 @@ sparkR.init <- function(
sparkPackages = "") {
if (exists(".sparkRjsc", envir = .sparkREnv)) {
- cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
+ cat(paste("Re-using existing Spark Context.",
+ "Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n"))
return(get(".sparkRjsc", envir = .sparkREnv))
}
@@ -180,14 +181,16 @@ sparkR.init <- function(
sparkExecutorEnvMap <- new.env()
if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
- sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
+ sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
+ paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
}
for (varname in names(sparkExecutorEnv)) {
sparkExecutorEnvMap[[varname]] <- sparkExecutorEnv[[varname]]
}
nonEmptyJars <- Filter(function(x) { x != "" }, jars)
- localJarPaths <- sapply(nonEmptyJars, function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
+ localJarPaths <- sapply(nonEmptyJars,
+ function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
# Set the start time to identify jobjs
# Seconds resolution is good enough for this purpose, so use ints
http://git-wip-us.apache.org/repos/asf/spark/blob/a0cb111b/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 13cec0f..ea629a6 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -334,18 +334,21 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
"MEMORY_ONLY_SER_2",
"OFF_HEAP")) {
match.arg(newLevel)
+ storageLevelClass <- "org.apache.spark.storage.StorageLevel"
storageLevel <- switch(newLevel,
- "DISK_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY"),
- "DISK_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY_2"),
- "MEMORY_AND_DISK" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK"),
- "MEMORY_AND_DISK_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_2"),
- "MEMORY_AND_DISK_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER"),
- "MEMORY_AND_DISK_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER_2"),
- "MEMORY_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY"),
- "MEMORY_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_2"),
- "MEMORY_ONLY_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER"),
- "MEMORY_ONLY_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER_2"),
- "OFF_HEAP" = callJStatic("org.apache.spark.storage.StorageLevel", "OFF_HEAP"))
+ "DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
+ "DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
+ "MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
+ "MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
+ "MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
+ "MEMORY_AND_DISK_SER"),
+ "MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass,
+ "MEMORY_AND_DISK_SER_2"),
+ "MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"),
+ "MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"),
+ "MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"),
+ "MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"),
+ "OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
}
# Utility function for functions where an argument needs to be integer but we want to allow
@@ -545,9 +548,11 @@ mergePartitions <- function(rdd, zip) {
lengthOfKeys <- part[[len - lengthOfValues]]
stopifnot(len == lengthOfKeys + lengthOfValues)
- # For zip operation, check if corresponding partitions of both RDDs have the same number of elements.
+ # For zip operation, check if corresponding partitions
+ # of both RDDs have the same number of elements.
if (zip && lengthOfKeys != lengthOfValues) {
- stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
+ stop(paste("Can only zip RDDs with same number of elements",
+ "in each pair of corresponding partitions."))
}
if (lengthOfKeys > 1) {
http://git-wip-us.apache.org/repos/asf/spark/blob/a0cb111b/R/pkg/inst/tests/test_includeJAR.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_includeJAR.R b/R/pkg/inst/tests/test_includeJAR.R
index 844d86f..cc1faea 100644
--- a/R/pkg/inst/tests/test_includeJAR.R
+++ b/R/pkg/inst/tests/test_includeJAR.R
@@ -18,8 +18,8 @@ context("include an external JAR in SparkContext")
runScript <- function() {
sparkHome <- Sys.getenv("SPARK_HOME")
- jarPath <- paste("--jars",
- shQuote(file.path(sparkHome, "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar")))
+ sparkTestJarPath <- "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar"
+ jarPath <- paste("--jars", shQuote(file.path(sparkHome, sparkTestJarPath)))
scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R")
submitPath <- file.path(sparkHome, "bin/spark-submit")
res <- system2(command = submitPath,
http://git-wip-us.apache.org/repos/asf/spark/blob/a0cb111b/R/pkg/inst/tests/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R
index fc3c01d..b796928 100644
--- a/R/pkg/inst/tests/test_rdd.R
+++ b/R/pkg/inst/tests/test_rdd.R
@@ -669,13 +669,15 @@ test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1,2), list(1,3), list(3,3)))
rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
- expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)), list(3, list(3, NULL)))
+ expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
+ list(2, list(NULL, 4)), list(3, list(3, NULL)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
rdd1 <- parallelize(sc, list(list("a",2), list("a",3), list("c", 1)))
rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
- expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)), list("c", list(1, NULL)))
+ expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
+ list("a", list(3, 1)), list("c", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
@@ -683,13 +685,15 @@ test_that("fullOuterJoin() on pairwise RDDs", {
rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
- sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
+ sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
+ list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
- sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
+ sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
+ list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
})
test_that("sortByKey() on pairwise RDDs", {
http://git-wip-us.apache.org/repos/asf/spark/blob/a0cb111b/R/pkg/inst/tests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 0e4235e..b0ea388 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -391,7 +391,7 @@ test_that("collect() and take() on a DataFrame return the same number of rows an
expect_equal(ncol(collect(df)), ncol(take(df, 10)))
})
-test_that("multiple pipeline transformations starting with a DataFrame result in an RDD with the correct values", {
+test_that("multiple pipeline transformations result in an RDD with the correct values", {
df <- jsonFile(sqlContext, jsonPath)
first <- lapply(df, function(row) {
row$age <- row$age + 5
@@ -756,7 +756,14 @@ test_that("toJSON() returns an RDD of the correct values", {
test_that("showDF()", {
df <- jsonFile(sqlContext, jsonPath)
s <- capture.output(showDF(df))
- expect_output(s , "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
+ expected <- paste("+----+-------+\n",
+ "| age| name|\n",
+ "+----+-------+\n",
+ "|null|Michael|\n",
+ "| 30| Andy|\n",
+ "| 19| Justin|\n",
+ "+----+-------+\n", sep="")
+ expect_output(s , expected)
})
test_that("isLocal()", {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org