You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@amaterasu.apache.org by GitBox <gi...@apache.org> on 2018/05/28 01:25:23 UTC

[GitHub] roadan closed pull request #20: PySpark fixes for YARN and Mesos

roadan closed pull request #20: PySpark fixes for YARN and Mesos
URL: https://github.com/apache/incubator-amaterasu/pull/20
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index 7c9f924..3661b48 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -71,8 +71,8 @@ class ClusterConfig extends Logging {
       var memoryMB: Int = 1024
 
       def load(props: Properties): Unit = {
-        if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").asInstanceOf[Int]
-        if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").asInstanceOf[Int]
+        if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").toInt
+        if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").toInt
       }
     }
 
@@ -83,8 +83,8 @@ class ClusterConfig extends Logging {
       var memoryMB: Int = 1024
 
       def load(props: Properties): Unit = {
-        if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").asInstanceOf[Int]
-        if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").asInstanceOf[Int]
+        if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").toInt
+        if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").toInt
       }
     }
 
@@ -133,9 +133,9 @@ class ClusterConfig extends Logging {
 
     def load(props: Properties): Unit = {
 
-      if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").asInstanceOf[Double]
-      if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").asInstanceOf[Long]
-      if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").asInstanceOf[Long]
+      if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").toDouble
+      if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").toLong
+      if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").toLong
 
       Tasks.load(props)
     }
@@ -148,9 +148,9 @@ class ClusterConfig extends Logging {
 
       def load(props: Properties): Unit = {
 
-        if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").asInstanceOf[Int]
-        if (props.containsKey("jobs.tasks.cpus")) attempts = props.getProperty("jobs.tasks.cpus").asInstanceOf[Int]
-        if (props.containsKey("jobs.tasks.mem")) attempts = props.getProperty("jobs.tasks.mem").asInstanceOf[Int]
+        if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").toInt
+        if (props.containsKey("jobs.tasks.cpus")) cpus = props.getProperty("jobs.tasks.cpus").toInt
+        if (props.containsKey("jobs.tasks.mem")) mem = props.getProperty("jobs.tasks.mem").toInt
 
       }
     }
@@ -209,7 +209,7 @@ class ClusterConfig extends Logging {
     if (props.containsKey("timeout")) timeout = props.getProperty("timeout").asInstanceOf[Double]
     if (props.containsKey("mode")) mode = props.getProperty("mode")
     if (props.containsKey("workingFolder")) workingFolder = props.getProperty("workingFolder", s"/user/$user")
-
+    if (props.containsKey("pysparkPath")) pysparkPath = props.getProperty("pysparkPath")
     // TODO: rethink this
     Jar = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
     JarName = Paths.get(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getFileName.toString
diff --git a/executor/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py
index 0faae2b..f3c9fc0 100755
--- a/executor/src/main/resources/spark_intp.py
+++ b/executor/src/main/resources/spark_intp.py
@@ -21,6 +21,7 @@
 import os
 import sys
 import zipimport
+sys.path.append(os.getcwd())
 from runtime import AmaContext, Environment
 
 # os.chdir(os.getcwd() + '/build/resources/test/')
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
index 94b8056..79fe18a 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
@@ -16,19 +16,21 @@
  */
 package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
 
-import java.io.{File, PrintWriter, StringWriter}
+import java.io.File
 import java.util
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{PythonDependencies, PythonPackage}
+import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
 import org.apache.amaterasu.sdk.AmaterasuRunner
 import org.apache.spark.SparkEnv
 import org.apache.spark.sql.SparkSession
 
-import scala.sys.process.Process
+import scala.sys.process.{Process, ProcessLogger}
+
+
 
 
 class PySparkRunner extends AmaterasuRunner with Logging {
@@ -69,6 +71,15 @@ class PySparkRunner extends AmaterasuRunner with Logging {
 
 object PySparkRunner {
 
+  def collectCondaPackages(): String = {
+    val pkgsDirs = new File("./miniconda/pkgs")
+    (pkgsDirs.listFiles.filter {
+      file => file.getName.endsWith(".tar.bz2")
+    }.map {
+      file => s"./miniconda/pkgs/${file.getName}"
+    }.toBuffer ++ "dist/codegen.py").mkString(",")
+  }
+
   def apply(env: Environment,
             jobId: String,
             notifier: Notifier,
@@ -77,14 +88,13 @@ object PySparkRunner {
             pyDeps: PythonDependencies,
             config: ClusterConfig): PySparkRunner = {
 
+    val shellLoger = ProcessLogger(
+      (o: String) => println(o),
+      (e: String) => println(e)
+    )
+
     //TODO: can we make this less ugly?
-    var pysparkPython = "/usr/bin/python"
 
-    if (pyDeps != null &&
-        pyDeps.packages.nonEmpty) {
-      loadPythonDependencies(pyDeps, notifier)
-      pysparkPython = "miniconda/bin/python"
-    }
 
     val result = new PySparkRunner
 
@@ -98,87 +108,44 @@ object PySparkRunner {
       intpPath = s"spark_intp.py"
     }
     var pysparkPath = ""
-    if (env.configuration.contains("pysparkPath")) {
-      pysparkPath = env.configuration("pysparkPath")
-    } else {
-      pysparkPath = s"${config.spark.home}/bin/spark-submit"
-    }
-    val proc = Process(Seq(pysparkPath, intpPath, port.toString), None,
-      "PYTHONPATH" -> pypath,
-      "PYSPARK_PYTHON" -> pysparkPython,
-      "PYTHONHASHSEED" -> 0.toString) #> System.out
+    var condaPkgs = ""
+    if (pyDeps != null)
+      condaPkgs = collectCondaPackages()
+    var sparkCmd: Seq[String] = Seq()
+    config.mode match {
+      case "yarn" =>
+        pysparkPath = s"spark/bin/spark-submit"
+        sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
+        val proc = Process(sparkCmd, None,
+          "PYTHONPATH" -> pypath,
+          "PYTHONHASHSEED" -> 0.toString)
+
+        proc.run(shellLoger)
+      case "mesos" =>
+        pysparkPath = config.pysparkPath
+        if (pysparkPath.endsWith("spark-submit")) {
+          sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString)
+        }
+        else {
+          sparkCmd = Seq(pysparkPath, intpPath, port.toString)
+        }
+        var pysparkPython = "/usr/bin/python"
 
-    proc.run()
+        if (pyDeps != null &&
+          pyDeps.packages.nonEmpty) {
+          pysparkPython = "./miniconda/bin/python"
+        }
+        val proc = Process(sparkCmd, None,
+          "PYTHONPATH" -> pypath,
+          "PYSPARK_PYTHON" -> pysparkPython,
+        "PYTHONHASHSEED" -> 0.toString)
 
+        proc.run(shellLoger)
+    }
 
     result.notifier = notifier
 
     result
   }
 
-  /**
-    * This installs the required python dependencies.
-    * We basically need 2 packages to make pyspark work with customer's scripts:
-    * 1. py4j - supplied by spark, for communication between Python and Java runtimes.
-    * 2. codegen - for dynamically parsing and converting customer's scripts into executable Python code objects.
-    * Currently we only know how to install packages using Anaconda, the reason is 3rd party OS libraries, e.g. libevent
-    * Anaconda has the capabilities to automatically resolve the required OS libraries per Python package and install them.
-    *
-    * TODO - figure out if we really want to support pip directly, or if Anaconda is enough.
-    * @param deps All of the customer's supplied Python dependencies, this currently comes from job-repo/deps/python.yml
-    * @param notifier
-    */
-  private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
-    notifier.info("loading anaconda evn")
-    installAnacondaOnNode()
-    val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
-    installAnacondaPackage(codegenPackage)
-    try {
-      deps.packages.foreach(pack => {
-        pack.index.getOrElse("anaconda").toLowerCase match {
-          case "anaconda" => installAnacondaPackage(pack)
-          // case "pypi" => installPyPiPackage(pack) TODO: See if we can support this
-        }
-      })
-    }
-    catch {
-
-      case rte: RuntimeException =>
-        val sw = new StringWriter
-        rte.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
-      case e: Exception =>
-        val sw = new StringWriter
-        e.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
-    }
-  }
-
-
-  /**
-    * Installs one python package using Anaconda.
-    * Anaconda works with multiple channels, or better called, repositories.
-    * Normally, if a channel isn't specified, Anaconda will fetch the package from the default conda channel.
-    * The reason we need to use channels, is that sometimes the required package doesn't exist on the default channel.
-    * @param pythonPackage This comes from parsing the python.yml dep file.
-    */
-  private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
-    val channel = pythonPackage.channel.getOrElse("anaconda")
-    if (channel == "anaconda") {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}")
-    } else {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}")
-    }
-  }
-
-  /**
-    * Installs Anaconda and then links it with the local spark that was installed on the executor.
-    */
-  private def installAnacondaOnNode(): Unit = {
-    Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda")
-    Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build")
-    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark")
-  }
-
-
 }
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
index ff56d8c..ba7ff03 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
@@ -47,6 +47,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging {
   )
   private var conf: Option[Map[String, Any]] = _
   private var executorEnv: Option[Map[String, Any]] = _
+  private var clusterConfig: ClusterConfig = _
 
   override def init(execData: ExecData,
                     jobId: String,
@@ -60,7 +61,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging {
       (o: String) => log.info(o),
       (e: String) => log.error("", e)
     )
-
+    clusterConfig = config
     var jars = Seq.empty[String]
 
     if (execData.deps != null) {
@@ -83,9 +84,15 @@ class SparkRunnersProvider extends RunnersProvider with Logging {
     sparkScalaRunner.initializeAmaContext(execData.env)
 
     runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
-
+    var pypath = ""
     // TODO: get rid of hard-coded version
-    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", execData.pyDeps, config)
+    config.mode match {
+      case "yarn" =>
+        pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}"
+      case "mesos" =>
+        pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
+    }
+    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
     runners.put(pySparkRunner.getIdentifier, pySparkRunner)
 
     lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
@@ -95,17 +102,22 @@ class SparkRunnersProvider extends RunnersProvider with Logging {
   private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
     val channel = pythonPackage.channel.getOrElse("anaconda")
     if (channel == "anaconda") {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
     } else {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
     }
   }
 
   private def installAnacondaOnNode(): Unit = {
     // TODO: get rid of hard-coded version
-    Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") ! shellLoger
-    Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") ! shellLoger
-    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger
+
+    this.clusterConfig.mode match {
+      case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger
+      case "mesos" => Seq("sh", "Miniconda2-latest-Linux-x86_64.sh", "-b", "-p", "miniconda") ! shellLoger
+    }
+
+    Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger
+    Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
   }
 
   private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
index 0bf7337..f2c2afa 100644
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
+++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
@@ -152,9 +152,9 @@ object SparkRunnerHelper extends Logging {
           .set("spark.history.kerberos.principal", "none")
 
           .set("spark.master", master)
-          .set("spark.executor.instances", "1") // TODO: change this
+          .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
           .set("spark.yarn.jars", s"spark/jars/*")
-          .set("spark.executor.memory", "1g")
+          .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
           .set("spark.dynamicAllocation.enabled", "false")
           .set("spark.eventLog.enabled", "false")
           .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
diff --git a/executor/src/test/resources/amaterasu.properties b/executor/src/test/resources/amaterasu.properties
index 19cb189..d402fed 100755
--- a/executor/src/test/resources/amaterasu.properties
+++ b/executor/src/test/resources/amaterasu.properties
@@ -6,3 +6,4 @@ mode=mesos
 webserver.port=8000
 webserver.root=dist
 spark.version=2.1.1-bin-hadoop2.7
+pysparkPath = /usr/bin/python
diff --git a/executor/src/test/resources/spark_intp.py b/executor/src/test/resources/spark_intp.py
index a427e92..fd8dc0e 100755
--- a/executor/src/test/resources/spark_intp.py
+++ b/executor/src/test/resources/spark_intp.py
@@ -31,6 +31,7 @@
 zip.extractall()
 sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
 sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
+sys.path.append(os.getcwd())
 
 # py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
 # py4j_importer = zipimport.zipimporter(py4j_path)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services