You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/12/03 22:25:32 UTC

spark git commit: [SPARK-12019][SPARKR] Support character vector for sparkR.init(), check param and fix doc

Repository: spark
Updated Branches:
  refs/heads/master a02d47277 -> 2213441e5


[SPARK-12019][SPARKR] Support character vector for sparkR.init(), check param and fix doc

and add tests.
Spark submit expects comma-separated list

Author: felixcheung <fe...@hotmail.com>

Closes #10034 from felixcheung/sparkrinitdoc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2213441e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2213441e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2213441e

Branch: refs/heads/master
Commit: 2213441e5e0fba01e05826257604aa427cdf2598
Parents: a02d472
Author: felixcheung <fe...@hotmail.com>
Authored: Thu Dec 3 13:25:20 2015 -0800
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Thu Dec 3 13:25:20 2015 -0800

----------------------------------------------------------------------
 R/pkg/R/client.R                | 10 +++++--
 R/pkg/R/sparkR.R                | 56 ++++++++++++++++++++++++------------
 R/pkg/R/utils.R                 |  5 ++++
 R/pkg/inst/tests/test_client.R  |  9 ++++++
 R/pkg/inst/tests/test_context.R | 20 +++++++++++++
 5 files changed, 79 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2213441e/R/pkg/R/client.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R
index c811d1d..25e9939 100644
--- a/R/pkg/R/client.R
+++ b/R/pkg/R/client.R
@@ -44,12 +44,16 @@ determineSparkSubmitBin <- function() {
 }
 
 generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
+  jars <- paste0(jars, collapse = ",")
   if (jars != "") {
-    jars <- paste("--jars", jars)
+    # construct the jars argument with a space between --jars and comma-separated values
+    jars <- paste0("--jars ", jars)
   }
 
-  if (!identical(packages, "")) {
-    packages <- paste("--packages", packages)
+  packages <- paste0(packages, collapse = ",")
+  if (packages != "") {
+    # construct the packages argument with a space between --packages and comma-separated values
+    packages <- paste0("--packages ", packages)
   }
 
   combinedArgs <- paste(jars, packages, sparkSubmitOpts, args, sep = " ")

http://git-wip-us.apache.org/repos/asf/spark/blob/2213441e/R/pkg/R/sparkR.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 7ff3fa6..d2bfad5 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -86,13 +86,13 @@ sparkR.stop <- function() {
 #' and use SparkR, refer to SparkR programming guide at
 #' \url{http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparkcontext-sqlcontext}.
 #'
-#' @param master The Spark master URL.
+#' @param master The Spark master URL
 #' @param appName Application name to register with cluster manager
 #' @param sparkHome Spark Home directory
-#' @param sparkEnvir Named list of environment variables to set on worker nodes.
-#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
-#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
-#' @param sparkPackages Character string vector of packages from spark-packages.org
+#' @param sparkEnvir Named list of environment variables to set on worker nodes
+#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors
+#' @param sparkJars Character vector of jar files to pass to the worker nodes
+#' @param sparkPackages Character vector of packages from spark-packages.org
 #' @export
 #' @examples
 #'\dontrun{
@@ -102,7 +102,9 @@ sparkR.stop <- function() {
 #' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
 #'                  list(spark.executor.memory="4g"),
 #'                  list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
-#'                  c("jarfile1.jar","jarfile2.jar"))
+#'                  c("one.jar", "two.jar", "three.jar"),
+#'                  c("com.databricks:spark-avro_2.10:2.0.1",
+#'                    "com.databricks:spark-csv_2.10:1.3.0"))
 #'}
 
 sparkR.init <- function(
@@ -120,15 +122,8 @@ sparkR.init <- function(
     return(get(".sparkRjsc", envir = .sparkREnv))
   }
 
-  jars <- suppressWarnings(normalizePath(as.character(sparkJars)))
-
-  # Classpath separator is ";" on Windows
-  # URI needs four /// as from http://stackoverflow.com/a/18522792
-  if (.Platform$OS.type == "unix") {
-    uriSep <- "//"
-  } else {
-    uriSep <- "////"
-  }
+  jars <- processSparkJars(sparkJars)
+  packages <- processSparkPackages(sparkPackages)
 
   sparkEnvirMap <- convertNamedListToEnv(sparkEnvir)
 
@@ -145,7 +140,7 @@ sparkR.init <- function(
         sparkHome = sparkHome,
         jars = jars,
         sparkSubmitOpts = submitOps,
-        packages = sparkPackages)
+        packages = packages)
     # wait atmost 100 seconds for JVM to launch
     wait <- 0.1
     for (i in 1:25) {
@@ -195,8 +190,14 @@ sparkR.init <- function(
       paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
   }
 
-  nonEmptyJars <- Filter(function(x) { x != "" }, jars)
-  localJarPaths <- lapply(nonEmptyJars,
+  # Classpath separator is ";" on Windows
+  # URI needs four /// as from http://stackoverflow.com/a/18522792
+  if (.Platform$OS.type == "unix") {
+    uriSep <- "//"
+  } else {
+    uriSep <- "////"
+  }
+  localJarPaths <- lapply(jars,
                           function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
 
   # Set the start time to identify jobjs
@@ -366,3 +367,22 @@ getClientModeSparkSubmitOpts <- function(submitOps, sparkEnvirMap) {
   # --option must be before the application class "sparkr-shell" in submitOps
   paste0(paste0(envirToOps, collapse = ""), submitOps)
 }
+
+# Utility function that handles sparkJars argument, and normalize paths
+processSparkJars <- function(jars) {
+  splittedJars <- splitString(jars)
+  if (length(splittedJars) > length(jars)) {
+    warning("sparkJars as a comma-separated string is deprecated, use character vector instead")
+  }
+  normalized <- suppressWarnings(normalizePath(splittedJars))
+  normalized
+}
+
+# Utility function that handles sparkPackages argument
+processSparkPackages <- function(packages) {
+  splittedPackages <- splitString(packages)
+  if (length(splittedPackages) > length(packages)) {
+    warning("sparkPackages as a comma-separated string is deprecated, use character vector instead")
+  }
+  splittedPackages
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2213441e/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 45c77a8..43105aa 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -636,3 +636,8 @@ assignNewEnv <- function(data) {
   }
   env
 }
+
+# Utility function to split by ',' and whitespace, remove empty tokens
+splitString <- function(input) {
+  Filter(nzchar, unlist(strsplit(input, ",|\\s")))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2213441e/R/pkg/inst/tests/test_client.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_client.R b/R/pkg/inst/tests/test_client.R
index 8a20991..a0664f3 100644
--- a/R/pkg/inst/tests/test_client.R
+++ b/R/pkg/inst/tests/test_client.R
@@ -34,3 +34,12 @@ test_that("no package specified doesn't add packages flag", {
 test_that("multiple packages don't produce a warning", {
   expect_that(generateSparkSubmitArgs("", "", "", "", c("A", "B")), not(gives_warning()))
 })
+
+test_that("sparkJars sparkPackages as character vectors", {
+  args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
+                                  c("com.databricks:spark-avro_2.10:2.0.1",
+                                    "com.databricks:spark-csv_2.10:1.3.0"))
+  expect_match(args, "--jars one.jar,two.jar,three.jar")
+  expect_match(args,
+    "--packages com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.10:1.3.0")
+})

http://git-wip-us.apache.org/repos/asf/spark/blob/2213441e/R/pkg/inst/tests/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_context.R b/R/pkg/inst/tests/test_context.R
index 80c1b89..1707e31 100644
--- a/R/pkg/inst/tests/test_context.R
+++ b/R/pkg/inst/tests/test_context.R
@@ -92,3 +92,23 @@ test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whiteli
                       " --driver-memory 4g sparkr-shell2"))
   # nolint end
 })
+
+test_that("sparkJars sparkPackages as comma-separated strings", {
+  expect_warning(processSparkJars(" a, b "))
+  jars <- suppressWarnings(processSparkJars(" a, b "))
+  expect_equal(jars, c("a", "b"))
+
+  jars <- suppressWarnings(processSparkJars(" abc ,, def "))
+  expect_equal(jars, c("abc", "def"))
+
+  jars <- suppressWarnings(processSparkJars(c(" abc ,, def ", "", "xyz", " ", "a,b")))
+  expect_equal(jars, c("abc", "def", "xyz", "a", "b"))
+
+  p <- processSparkPackages(c("ghi", "lmn"))
+  expect_equal(p, c("ghi", "lmn"))
+
+  # check normalizePath
+  f <- dir()[[1]]
+  expect_that(processSparkJars(f), not(gives_warning()))
+  expect_match(processSparkJars(f), f)
+})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org