You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/07/13 17:21:52 UTC

spark git commit: [SPARK-6797] [SPARKR] Add support for YARN cluster mode.

Repository: spark
Updated Branches:
  refs/heads/master a5bc803b7 -> 7f487c8bd


[SPARK-6797] [SPARKR] Add support for YARN cluster mode.

This PR enables SparkR to dynamically ship the SparkR binary package to the AM node in YARN cluster mode, thus it is no longer required that the SparkR package be installed on each worker node.

This PR uses the JDK jar tool to package the SparkR package, because jar is thought to be available on both Linux/Windows platforms where JDK has been installed.

This PR does not address the R worker involved in RDD API. Will address it in a separate JIRA issue.

This PR does not address SBT build. SparkR installation and packaging by SBT will be addressed in a separate JIRA issue.

R/install-dev.bat is not tested. shivaram , Could you help to test it?

Author: Sun Rui <ru...@intel.com>

Closes #6743 from sun-rui/SPARK-6797 and squashes the following commits:

ca63c86 [Sun Rui] Adjust MimaExcludes after rebase.
7313374 [Sun Rui] Fix unit test errors.
72695fb [Sun Rui] Fix unit test failures.
193882f [Sun Rui] Fix Mima test error.
fe25a33 [Sun Rui] Fix Mima test error.
35ecfa3 [Sun Rui] Fix comments.
c38a005 [Sun Rui] Unzipped SparkR binary package is still required for standalone and Mesos modes.
b05340c [Sun Rui] Fix scala style.
2ca5048 [Sun Rui] Fix comments.
1acefd1 [Sun Rui] Fix scala style.
0aa1e97 [Sun Rui] Fix scala style.
41d4f17 [Sun Rui] Add support for locating SparkR package for R workers required by RDD APIs.
49ff948 [Sun Rui] Invoke jar.exe with full path in install-dev.bat.
7b916c5 [Sun Rui] Use 'rem' consistently.
3bed438 [Sun Rui] Add a comment.
681afb0 [Sun Rui] Fix a bug that RRunner does not handle client deployment modes.
cedfbe2 [Sun Rui] [SPARK-6797][SPARKR] Add support for YARN cluster mode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f487c8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f487c8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f487c8b

Branch: refs/heads/master
Commit: 7f487c8bde14dbdd244a3493ad11a129ef2bb327
Parents: a5bc803
Author: Sun Rui <ru...@intel.com>
Authored: Mon Jul 13 08:21:47 2015 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Mon Jul 13 08:21:47 2015 -0700

----------------------------------------------------------------------
 R/install-dev.bat                               |  5 ++
 R/install-dev.sh                                |  8 ++-
 R/pkg/DESCRIPTION                               |  1 -
 R/pkg/R/RDD.R                                   |  2 -
 R/pkg/R/pairRDD.R                               |  1 -
 R/pkg/R/sparkR.R                                | 10 ---
 R/pkg/R/zzz.R                                   | 20 ------
 R/pkg/inst/profile/general.R                    |  4 +-
 .../scala/org/apache/spark/api/r/RRDD.scala     | 21 +++----
 .../scala/org/apache/spark/api/r/RUtils.scala   | 65 ++++++++++++++++++++
 .../scala/org/apache/spark/deploy/RRunner.scala |  7 ++-
 .../org/apache/spark/deploy/SparkSubmit.scala   | 27 ++++++++
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  3 +-
 make-distribution.sh                            |  1 +
 project/MimaExcludes.scala                      | 12 ++++
 15 files changed, 133 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/R/install-dev.bat
----------------------------------------------------------------------
diff --git a/R/install-dev.bat b/R/install-dev.bat
index 008a5c6..f32670b 100644
--- a/R/install-dev.bat
+++ b/R/install-dev.bat
@@ -25,3 +25,8 @@ set SPARK_HOME=%~dp0..
 MKDIR %SPARK_HOME%\R\lib
 
 R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib"  %SPARK_HOME%\R\pkg\
+
+rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
+pushd %SPARK_HOME%\R\lib
+%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
+popd

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/R/install-dev.sh
----------------------------------------------------------------------
diff --git a/R/install-dev.sh b/R/install-dev.sh
index 1edd551..4972bb9 100755
--- a/R/install-dev.sh
+++ b/R/install-dev.sh
@@ -34,7 +34,7 @@ LIB_DIR="$FWDIR/lib"
 
 mkdir -p $LIB_DIR
 
-pushd $FWDIR
+pushd $FWDIR > /dev/null
 
 # Generate Rd files if devtools is installed
 Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'
@@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
 # Install SparkR to $LIB_DIR
 R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
 
-popd
+# Zip the SparkR package so that it can be distributed to worker nodes on YARN
+cd $LIB_DIR
+jar cfM "$LIB_DIR/sparkr.zip" SparkR
+
+popd > /dev/null

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/R/pkg/DESCRIPTION
----------------------------------------------------------------------
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index efc85bb..d028821 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -32,4 +32,3 @@ Collate:
     'serialize.R'
     'sparkR.R'
     'utils.R'
-    'zzz.R'

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 8951114..d2d0967 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -165,7 +165,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
                                    serializedFuncArr,
                                    rdd@env$prev_serializedMode,
                                    packageNamesArr,
-                                   as.character(.sparkREnv[["libname"]]),
                                    broadcastArr,
                                    callJMethod(prev_jrdd, "classTag"))
             } else {
@@ -175,7 +174,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
                                    rdd@env$prev_serializedMode,
                                    serializedMode,
                                    packageNamesArr,
-                                   as.character(.sparkREnv[["libname"]]),
                                    broadcastArr,
                                    callJMethod(prev_jrdd, "classTag"))
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 0f1179e..ebc6ff6 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -215,7 +215,6 @@ setMethod("partitionBy",
                                        serializedHashFuncBytes,
                                        getSerializedMode(x),
                                        packageNamesArr,
-                                       as.character(.sparkREnv$libname),
                                        broadcastArr,
                                        callJMethod(jrdd, "classTag"))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/R/pkg/R/sparkR.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 048eb8e..1723358 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -17,10 +17,6 @@
 
 .sparkREnv <- new.env()
 
-sparkR.onLoad <- function(libname, pkgname) {
-  .sparkREnv$libname <- libname
-}
-
 # Utility function that returns TRUE if we have an active connection to the
 # backend and FALSE otherwise
 connExists <- function(env) {
@@ -80,7 +76,6 @@ sparkR.stop <- function() {
 #' @param sparkEnvir Named list of environment variables to set on worker nodes.
 #' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
 #' @param sparkJars Character string vector of jar files to pass to the worker nodes.
-#' @param sparkRLibDir The path where R is installed on the worker nodes.
 #' @param sparkPackages Character string vector of packages from spark-packages.org
 #' @export
 #' @examples
@@ -101,7 +96,6 @@ sparkR.init <- function(
   sparkEnvir = list(),
   sparkExecutorEnv = list(),
   sparkJars = "",
-  sparkRLibDir = "",
   sparkPackages = "") {
 
   if (exists(".sparkRjsc", envir = .sparkREnv)) {
@@ -170,10 +164,6 @@ sparkR.init <- function(
     sparkHome <- normalizePath(sparkHome)
   }
 
-  if (nchar(sparkRLibDir) != 0) {
-    .sparkREnv$libname <- sparkRLibDir
-  }
-
   sparkEnvirMap <- new.env()
   for (varname in names(sparkEnvir)) {
     sparkEnvirMap[[varname]] <- sparkEnvir[[varname]]

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/R/pkg/R/zzz.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/zzz.R b/R/pkg/R/zzz.R
deleted file mode 100644
index 301fead..0000000
--- a/R/pkg/R/zzz.R
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-.onLoad <- function(libname, pkgname) {
-  sparkR.onLoad(libname, pkgname)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/R/pkg/inst/profile/general.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/profile/general.R b/R/pkg/inst/profile/general.R
index 8fe711b..2a8a821 100644
--- a/R/pkg/inst/profile/general.R
+++ b/R/pkg/inst/profile/general.R
@@ -16,7 +16,7 @@
 #
 
 .First <- function() {
-  home <- Sys.getenv("SPARK_HOME")
-  .libPaths(c(file.path(home, "R", "lib"), .libPaths()))
+  packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
+  .libPaths(c(packageDir, .libPaths()))
   Sys.setenv(NOAWT=1)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index ff1702f..23a470d 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -39,7 +39,6 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
     deserializer: String,
     serializer: String,
     packageNames: Array[Byte],
-    rLibDir: String,
     broadcastVars: Array[Broadcast[Object]])
   extends RDD[U](parent) with Logging {
   protected var dataStream: DataInputStream = _
@@ -60,7 +59,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
 
     // The stdout/stderr is shared by multiple tasks, because we use one daemon
     // to launch child process as worker.
-    val errThread = RRDD.createRWorker(rLibDir, listenPort)
+    val errThread = RRDD.createRWorker(listenPort)
 
     // We use two sockets to separate input and output, then it's easy to manage
     // the lifecycle of them to avoid deadlock.
@@ -235,11 +234,10 @@ private class PairwiseRRDD[T: ClassTag](
     hashFunc: Array[Byte],
     deserializer: String,
     packageNames: Array[Byte],
-    rLibDir: String,
     broadcastVars: Array[Object])
   extends BaseRRDD[T, (Int, Array[Byte])](
     parent, numPartitions, hashFunc, deserializer,
-    SerializationFormats.BYTE, packageNames, rLibDir,
+    SerializationFormats.BYTE, packageNames,
     broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
 
   override protected def readData(length: Int): (Int, Array[Byte]) = {
@@ -266,10 +264,9 @@ private class RRDD[T: ClassTag](
     deserializer: String,
     serializer: String,
     packageNames: Array[Byte],
-    rLibDir: String,
     broadcastVars: Array[Object])
   extends BaseRRDD[T, Array[Byte]](
-    parent, -1, func, deserializer, serializer, packageNames, rLibDir,
+    parent, -1, func, deserializer, serializer, packageNames,
     broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
 
   override protected def readData(length: Int): Array[Byte] = {
@@ -293,10 +290,9 @@ private class StringRRDD[T: ClassTag](
     func: Array[Byte],
     deserializer: String,
     packageNames: Array[Byte],
-    rLibDir: String,
     broadcastVars: Array[Object])
   extends BaseRRDD[T, String](
-    parent, -1, func, deserializer, SerializationFormats.STRING, packageNames, rLibDir,
+    parent, -1, func, deserializer, SerializationFormats.STRING, packageNames,
     broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
 
   override protected def readData(length: Int): String = {
@@ -392,9 +388,10 @@ private[r] object RRDD {
     thread
   }
 
-  private def createRProcess(rLibDir: String, port: Int, script: String): BufferedStreamThread = {
+  private def createRProcess(port: Int, script: String): BufferedStreamThread = {
     val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript")
     val rOptions = "--vanilla"
+    val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
     val rExecScript = rLibDir + "/SparkR/worker/" + script
     val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))
     // Unset the R_TESTS environment variable for workers.
@@ -413,7 +410,7 @@ private[r] object RRDD {
   /**
    * ProcessBuilder used to launch worker R processes.
    */
-  def createRWorker(rLibDir: String, port: Int): BufferedStreamThread = {
+  def createRWorker(port: Int): BufferedStreamThread = {
     val useDaemon = SparkEnv.get.conf.getBoolean("spark.sparkr.use.daemon", true)
     if (!Utils.isWindows && useDaemon) {
       synchronized {
@@ -421,7 +418,7 @@ private[r] object RRDD {
           // we expect one connections
           val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
           val daemonPort = serverSocket.getLocalPort
-          errThread = createRProcess(rLibDir, daemonPort, "daemon.R")
+          errThread = createRProcess(daemonPort, "daemon.R")
           // the socket used to send out the input of task
           serverSocket.setSoTimeout(10000)
           val sock = serverSocket.accept()
@@ -443,7 +440,7 @@ private[r] object RRDD {
         errThread
       }
     } else {
-      createRProcess(rLibDir, port, "worker.R")
+      createRProcess(port, "worker.R")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
new file mode 100644
index 0000000..d53abd3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.r
+
+import java.io.File
+
+import org.apache.spark.{SparkEnv, SparkException}
+
+private[spark] object RUtils {
+  /**
+   * Get the SparkR package path in the local spark distribution.
+   */
+  def localSparkRPackagePath: Option[String] = {
+    val sparkHome = sys.env.get("SPARK_HOME")
+    sparkHome.map(
+      Seq(_, "R", "lib").mkString(File.separator)
+    )
+  }
+
+  /**
+   * Get the SparkR package path in various deployment modes.
+   * This assumes that Spark properties `spark.master` and `spark.submit.deployMode`
+   * and environment variable `SPARK_HOME` are set.
+   */
+  def sparkRPackagePath(isDriver: Boolean): String = {
+    val (master, deployMode) =
+      if (isDriver) {
+        (sys.props("spark.master"), sys.props("spark.submit.deployMode"))
+      } else {
+        val sparkConf = SparkEnv.get.conf
+        (sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode"))
+      }
+
+    val isYarnCluster = master.contains("yarn") && deployMode == "cluster"
+    val isYarnClient = master.contains("yarn") && deployMode == "client"
+
+    // In YARN mode, the SparkR package is distributed as an archive symbolically
+    // linked to the "sparkr" file in the current directory. Note that this does not apply
+    // to the driver in client mode because it is run outside of the cluster.
+    if (isYarnCluster || (isYarnClient && !isDriver)) {
+      new File("sparkr").getAbsolutePath
+    } else {
+      // Otherwise, assume the package is local
+      // TODO: support this for Mesos
+      localSparkRPackagePath.getOrElse {
+        throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
index 4165740..c0cab22 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.api.r.RBackend
+import org.apache.spark.api.r.{RBackend, RUtils}
 import org.apache.spark.util.RedirectThread
 
 /**
@@ -71,9 +71,10 @@ object RRunner {
         val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)
         val env = builder.environment()
         env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
-        val sparkHome = System.getenv("SPARK_HOME")
+        val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
+        env.put("SPARKR_PACKAGE_DIR", rPackageDir)
         env.put("R_PROFILE_USER",
-          Seq(sparkHome, "R", "lib", "SparkR", "profile", "general.R").mkString(File.separator))
+          Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator))
         builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
         val process = builder.start()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4cec901..7089a7e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -37,6 +37,7 @@ import org.apache.ivy.core.settings.IvySettings
 import org.apache.ivy.plugins.matcher.GlobPatternMatcher
 import org.apache.ivy.plugins.repository.file.FileRepository
 import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
+import org.apache.spark.api.r.RUtils
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.deploy.rest._
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -79,6 +80,7 @@ object SparkSubmit {
   private val SPARK_SHELL = "spark-shell"
   private val PYSPARK_SHELL = "pyspark-shell"
   private val SPARKR_SHELL = "sparkr-shell"
+  private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
 
   private val CLASS_NOT_FOUND_EXIT_STATUS = 101
 
@@ -262,6 +264,12 @@ object SparkSubmit {
       }
     }
 
+    // Update args.deployMode if it is null. It will be passed down as a Spark property later.
+    (args.deployMode, deployMode) match {
+      case (null, CLIENT) => args.deployMode = "client"
+      case (null, CLUSTER) => args.deployMode = "cluster"
+      case _ =>
+    }
     val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
     val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
 
@@ -347,6 +355,23 @@ object SparkSubmit {
       }
     }
 
+    // In YARN mode for an R app, add the SparkR package archive to archives
+    // that can be distributed with the job
+    if (args.isR && clusterManager == YARN) {
+      val rPackagePath = RUtils.localSparkRPackagePath
+      if (rPackagePath.isEmpty) {
+        printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
+      }
+      val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
+      if (!rPackageFile.exists()) {
+        printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
+      }
+      val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)
+
+      // Assigns a symbol link name "sparkr" to the shipped package.
+      args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")
+    }
+
     // If we're running a R app, set the main class to our specific R runner
     if (args.isR && deployMode == CLIENT) {
       if (args.primaryResource == SPARKR_SHELL) {
@@ -375,6 +400,8 @@ object SparkSubmit {
 
       // All cluster managers
       OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
+      OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
+        sysProp = "spark.submit.deployMode"),
       OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
       OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
       OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1b64c32..e7878bd 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -246,7 +246,7 @@ class SparkSubmitSuite
       mainClass should be ("org.apache.spark.deploy.Client")
     }
     classpath should have size 0
-    sysProps should have size 8
+    sysProps should have size 9
     sysProps.keys should contain ("SPARK_SUBMIT")
     sysProps.keys should contain ("spark.master")
     sysProps.keys should contain ("spark.app.name")
@@ -255,6 +255,7 @@ class SparkSubmitSuite
     sysProps.keys should contain ("spark.driver.cores")
     sysProps.keys should contain ("spark.driver.supervise")
     sysProps.keys should contain ("spark.shuffle.spill")
+    sysProps.keys should contain ("spark.submit.deployMode")
     sysProps("spark.shuffle.spill") should be ("false")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/make-distribution.sh
----------------------------------------------------------------------
diff --git a/make-distribution.sh b/make-distribution.sh
index 9f063da..cac7032 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -219,6 +219,7 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR"
 if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then
   mkdir -p "$DISTDIR"/R/lib
   cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib
+  cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib
 fi
 
 # Download and copy in tachyon, if requested

http://git-wip-us.apache.org/repos/asf/spark/blob/7f487c8b/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 79089aa..4e4e810 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -83,6 +83,18 @@ object MimaExcludes {
               "org.apache.spark.streaming.scheduler.InputInfo$"),
             ProblemFilters.exclude[MissingClassProblem](
               "org.apache.spark.streaming.scheduler.InputInfo")
+          ) ++ Seq(
+            // SPARK-6797 Support YARN modes for SparkR
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.r.PairwiseRRDD.this"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.r.RRDD.createRWorker"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.r.RRDD.this"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.r.StringRRDD.this"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.api.r.BaseRRDD.this")
           )
 
         case v if v.startsWith("1.4") =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org