You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@amaterasu.apache.org by ya...@apache.org on 2019/05/06 02:40:18 UTC

[incubator-amaterasu] 02/36: virtual env is now available

This is an automated email from the ASF dual-hosted git repository.

yaniv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git

commit 3a872be239c435d27a778b7a814eb0728e3df8b7
Merge: b53d7c0 b0b6472
Author: Yaniv Rodenski <ya...@shinto.io>
AuthorDate: Thu Apr 18 23:31:48 2019 +1000

    virtual env is now available

 common/build.gradle                                |  12 ++-
 .../org/apache/amaterasu/common/utils/FileUtil.kt  | 100 +++++++++++++++++++++
 .../apache/amaterasu/common/utils/FileTestUtils.kt |  25 ++++++
 .../dist/amaterasu_pandas-0.2.0-incubating-rc4.zip | Bin 8304 -> 8304 bytes
 .../dist/amaterasu_python-0.2.0-incubating-rc4.zip | Bin 6167 -> 6167 bytes
 .../runners/providers/PySparkRunnerProvider.scala  |  10 +--
 .../providers/SparkSubmitScalaRunnerProvider.scala |  15 ++--
 .../amaterasu_pyspark-0.2.0-incubating-rc4.zip     | Bin 14488 -> 14488 bytes
 .../amaterasu/leader/yarn/ApplicationMaster.kt     |  12 ++-
 .../leader/mesos/schedulers/JobScheduler.scala     |  40 +++++++--
 .../sdk/frameworks/RunnerSetupProvider.kt          |  31 +++++--
 .../dist/amaterasu-sdk-0.2.0-incubating-rc4.zip    | Bin 14898 -> 14898 bytes
 12 files changed, 213 insertions(+), 32 deletions(-)

diff --cc frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
index 69a9002,0000000..f4aa8c6
mode 100644,000000..100644
Binary files differ
diff --cc frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
index 448adf5,0000000..f62dc5a
mode 100644,000000..100644
Binary files differ
diff --cc frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index 465691c,d0a2442..fd868ac
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@@ -1,31 -1,48 +1,31 @@@
  package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
  
 -import java.net.URLEncoder
 -
  import org.apache.amaterasu.common.configuration.ClusterConfig
  import org.apache.amaterasu.common.dataobjects.ActionData
 -import org.apache.amaterasu.leader.common.utilities.DataLoader
 -import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
 -import org.apache.commons.lang.StringUtils
 -import org.apache.hadoop.yarn.api.ApplicationConstants
 -
 -class PySparkRunnerProvider extends RunnerSetupProvider {
 -
 -  private var conf: ClusterConfig = _
 -  private val libPath = System.getProperty("java.library.path")
 -
 -  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match {
 -    case "mesos" =>
 -      s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " +
 -      s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " +
 -      s"-Dscala.usejavacp=true -Djava.library.path=$libPath org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}.stripMargin"
 -    case "yarn" => "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && " +
 -      s"/bin/bash ${StringUtils.stripStart(conf.spark.home,"/")}/conf/load-spark.sh && " +
 -      s"java -cp ${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/ " +
 -      "-Xmx2G " +
 -      "-Dscala.usejavacp=true " +
 -      "-Dhdp.version=2.6.5.0-292 " +
 -      "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
 -      s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
 -      s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
 -      s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
 -    case _ => ""
 -  }
 +import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
- import org.apache.commons.lang.StringUtils
  
 -  override def getRunnerResources: Array[String] =
 -    Array[String]("miniconda.sh", "spark_intp.py", "runtime.py", "codegen.py")
 +class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) {
  
 -  override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] =
 -    Array[String]()
 +  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = {
-     //val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
++    val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
 +    log.info(s"===> Cluster manager: ${conf.mode}")
-     //command +
-      // s"$$SPARK_HOME/conf/spark-env.sh && env PYSPARK_PYTHON=$getVirtualPythonPath " +
-       s"$$SPARK_HOME/bin/spark-submit ${actionData.getSrc}"
++    command +
++      //s" $$SPARK_HOME/conf/spark-env.sh &&" +
++      s" && env PYSPARK_PYTHON=$getVirtualPythonPath" +
++      s" $$SPARK_HOME/bin/spark-submit ${actionData.getSrc}"
 +  }
 +
 +  override def getRunnerResources: Array[String] = {
 +    var resources = super.getRunnerResources
 +    resources = resources :+ s"amaterasu_pyspark-${conf.version}.zip"
 +    log.info(s"PYSPARK RESOURCES ==> ${resources.toSet}")
 +    resources
 +  }
  
 -  override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
 -    Array[String]()
  
 -  override def getHasExecutor: Boolean = true
 +  override def getHasExecutor: Boolean = false
  
 +  override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]()
  }
  
  object PySparkRunnerProvider {
diff --cc frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
index 9855261,0000000..c8e66c6
mode 100644,000000..100644
Binary files differ
diff --cc leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index cd4bf03,3e8a7b6..77c17d1
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@@ -249,8 -243,6 +248,8 @@@ class ApplicationMaster : KLogging(), A
  
                          jobManager.actionStarted(actionData.id)
                          containersIdsToTask[container.id.containerId] = actionData
 +                        notifier.info("created container for ${actionData.name} created")
-                         //ctx.localResources.forEach { t: String, u: LocalResource ->  notifier.info("resource: $t = ${u.resource}") }
++                        ctx.localResources.forEach { t: String, u: LocalResource ->  notifier.info("resource: $t = ${u.resource}") }
                          log.info("launching container succeeded: ${container.id.containerId}; task: ${actionData.id}")
                      }
                  }
diff --cc leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index a87ea5d,464e3bf..f9b1060
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@@ -17,8 -17,9 +17,9 @@@
  package org.apache.amaterasu.leader.mesos.schedulers
  
  import java.io.{File, PrintWriter, StringWriter}
+ import java.nio.file.{Files, Path, Paths, StandardCopyOption}
  import java.util
 -import java.util.UUID
 +import java.util.{Collections, UUID}
  import java.util.concurrent.locks.ReentrantLock
  import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
  
@@@ -247,9 -248,33 +252,28 @@@ class JobScheduler extends AmaterasuSch
                .setExtract(false)
                .build()))
  
+             // setting up action executable
+             val sourcePath = new File(runnerProvider.getActionExecutable(jobManager.getJobId, actionData))
+             var executable: Path = null
+             if (actionData.getHasArtifact) {
+               val relativePath = amaDist.toPath.getRoot.relativize(sourcePath.toPath)
+               executable = relativePath.subpath(amaDist.toPath.getNameCount, relativePath.getNameCount)
+             } else {
+               val dest = new File(s"dist/${jobManager.getJobId}/${sourcePath.toString}")
+               FileUtils.moveFile(sourcePath, dest)
+               executable = Paths.get(jobManager.getJobId, sourcePath.toPath.toString)
+             }
+ 
+             println(s"===> executable $executable")
+             command.addUris(URI.newBuilder
 -              .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$executable")
++              .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$executable")
+               .setExecutable(false)
+               .setExtract(false)
+               .build())
+ 
              command
                .addUris(URI.newBuilder()
 -                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side
 -                .setExecutable(true)
 -                .setExtract(false)
 -                .build())
 -              .addUris(URI.newBuilder()
 -                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
 +                .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/amaterasu.properties")
                  .setExecutable(false)
                  .setExtract(false)
                  .build())
diff --cc sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
index 57736cc,9af488e..eb75b2d
--- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
+++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
@@@ -17,10 -17,10 +17,12 @@@
  package org.apache.amaterasu.sdk.frameworks
  
  import org.apache.amaterasu.common.dataobjects.ActionData
+ import org.apache.amaterasu.common.utils.ArtifactUtil
+ import org.apache.amaterasu.common.utils.FileUtil
 +import org.apache.amaterasu.common.logging.KLogging
 +import org.apache.amaterasu.common.logging.Logging
  
 -abstract class RunnerSetupProvider {
 +abstract class RunnerSetupProvider : Logging() {
  
      private val actionFiles = arrayOf("env.yaml", "runtime.yaml", "datastores.yaml")
  
diff --cc sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
index 7ba06f4,0000000..9ce27b8
mode 100644,000000..100644
Binary files differ