You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/13 17:37:21 UTC

spark git commit: [SPARK-21403][MESOS] fix --packages for mesos

Repository: spark
Updated Branches:
  refs/heads/master af80e01b5 -> d8257b99d


[SPARK-21403][MESOS] fix --packages for mesos

## What changes were proposed in this pull request?
Fixes --packages flag for mesos in cluster mode. Probably I will handle standalone and Yarn in another commit, I need to investigate those cases as they are different.

## How was this patch tested?
Tested with a community 1.9 dc/os cluster. packages were successfully resolved in cluster mode within a container.

andrewor14  susanxhuynh ArtRand srowen  pls review.

Author: Stavros Kontopoulos <st...@gmail.com>

Closes #18587 from skonto/fix_packages_mesos_cluster.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8257b99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8257b99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8257b99

Branch: refs/heads/master
Commit: d8257b99ddae23f702f312640a5335ddb4554403
Parents: af80e01
Author: Stavros Kontopoulos <st...@gmail.com>
Authored: Thu Jul 13 10:37:15 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Jul 13 10:37:15 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   | 89 +++++++++++---------
 1 file changed, 50 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8257b99/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index abde040..0ea1436 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -273,6 +273,25 @@ object SparkSubmit extends CommandLineUtils {
       }
     }
 
+    // Fail fast, the following modes are not supported or applicable
+    (clusterManager, deployMode) match {
+      case (STANDALONE, CLUSTER) if args.isPython =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for python " +
+          "applications on standalone clusters.")
+      case (STANDALONE, CLUSTER) if args.isR =>
+        printErrorAndExit("Cluster deploy mode is currently not supported for R " +
+          "applications on standalone clusters.")
+      case (LOCAL, CLUSTER) =>
+        printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
+      case (_, CLUSTER) if isShell(args.primaryResource) =>
+        printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
+      case (_, CLUSTER) if isSqlShell(args.mainClass) =>
+        printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
+      case (_, CLUSTER) if isThriftServer(args.mainClass) =>
+        printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
+      case _ =>
+    }
+
     // Update args.deployMode if it is null. It will be passed down as a Spark property later.
     (args.deployMode, deployMode) match {
       case (null, CLIENT) => args.deployMode = "client"
@@ -282,36 +301,40 @@ object SparkSubmit extends CommandLineUtils {
     val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
     val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
 
-    // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
-    // too for packages that include Python code
-    val exclusions: Seq[String] =
+    if (!isMesosCluster) {
+      // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
+      // too for packages that include Python code
+      val exclusions: Seq[String] =
       if (!StringUtils.isBlank(args.packagesExclusions)) {
         args.packagesExclusions.split(",")
       } else {
         Nil
       }
 
-    // Create the IvySettings, either load from file or build defaults
-    val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
-      SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
-        Option(args.ivyRepoPath))
-    }.getOrElse {
-      SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
-    }
+      // Create the IvySettings, either load from file or build defaults
+      val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
+        SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
+          Option(args.ivyRepoPath))
+      }.getOrElse {
+        SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
+      }
 
-    val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
-      ivySettings, exclusions = exclusions)
-    if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
-      args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
-      if (args.isPython) {
-        args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
+      val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
+        ivySettings, exclusions = exclusions)
+
+
+      if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
+        args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
+        if (args.isPython) {
+          args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
+        }
       }
-    }
 
-    // install any R packages that may have been passed through --jars or --packages.
-    // Spark Packages may contain R source code inside the jar.
-    if (args.isR && !StringUtils.isBlank(args.jars)) {
-      RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
+      // install any R packages that may have been passed through --jars or --packages.
+      // Spark Packages may contain R source code inside the jar.
+      if (args.isR && !StringUtils.isBlank(args.jars)) {
+        RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
+      }
     }
 
     val hadoopConf = new HadoopConfiguration()
@@ -343,24 +366,6 @@ object SparkSubmit extends CommandLineUtils {
       }.orNull
     }
 
-    // The following modes are not supported or applicable
-    (clusterManager, deployMode) match {
-      case (STANDALONE, CLUSTER) if args.isPython =>
-        printErrorAndExit("Cluster deploy mode is currently not supported for python " +
-          "applications on standalone clusters.")
-      case (STANDALONE, CLUSTER) if args.isR =>
-        printErrorAndExit("Cluster deploy mode is currently not supported for R " +
-          "applications on standalone clusters.")
-      case (LOCAL, CLUSTER) =>
-        printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
-      case (_, CLUSTER) if isShell(args.primaryResource) =>
-        printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
-      case (_, CLUSTER) if isSqlShell(args.mainClass) =>
-        printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
-      case (_, CLUSTER) if isThriftServer(args.mainClass) =>
-        printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
-      case _ =>
-    }
 
     // If we're running a python app, set the main class to our specific python runner
     if (args.isPython && deployMode == CLIENT) {
@@ -468,6 +473,12 @@ object SparkSubmit extends CommandLineUtils {
       OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
         sysProp = "spark.driver.extraLibraryPath"),
 
+      // Mesos only - propagate attributes for dependency resolution at the driver side
+      OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"),
+      OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"),
+      OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"),
+      OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"),
+
       // Yarn only
       OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
       OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org