You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@amaterasu.apache.org by ya...@apache.org on 2019/05/06 02:40:18 UTC
[incubator-amaterasu] 02/36: virtual env is now available
This is an automated email from the ASF dual-hosted git repository.
yaniv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git
commit 3a872be239c435d27a778b7a814eb0728e3df8b7
Merge: b53d7c0 b0b6472
Author: Yaniv Rodenski <ya...@shinto.io>
AuthorDate: Thu Apr 18 23:31:48 2019 +1000
virtual env is now available
common/build.gradle | 12 ++-
.../org/apache/amaterasu/common/utils/FileUtil.kt | 100 +++++++++++++++++++++
.../apache/amaterasu/common/utils/FileTestUtils.kt | 25 ++++++
.../dist/amaterasu_pandas-0.2.0-incubating-rc4.zip | Bin 8304 -> 8304 bytes
.../dist/amaterasu_python-0.2.0-incubating-rc4.zip | Bin 6167 -> 6167 bytes
.../runners/providers/PySparkRunnerProvider.scala | 10 +--
.../providers/SparkSubmitScalaRunnerProvider.scala | 15 ++--
.../amaterasu_pyspark-0.2.0-incubating-rc4.zip | Bin 14488 -> 14488 bytes
.../amaterasu/leader/yarn/ApplicationMaster.kt | 12 ++-
.../leader/mesos/schedulers/JobScheduler.scala | 40 +++++++--
.../sdk/frameworks/RunnerSetupProvider.kt | 31 +++++--
.../dist/amaterasu-sdk-0.2.0-incubating-rc4.zip | Bin 14898 -> 14898 bytes
12 files changed, 213 insertions(+), 32 deletions(-)
diff --cc frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
index 69a9002,0000000..f4aa8c6
mode 100644,000000..100644
Binary files differ
diff --cc frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
index 448adf5,0000000..f62dc5a
mode 100644,000000..100644
Binary files differ
diff --cc frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index 465691c,d0a2442..fd868ac
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@@ -1,31 -1,48 +1,31 @@@
package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-import java.net.URLEncoder
-
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.utilities.DataLoader
-import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
-import org.apache.commons.lang.StringUtils
-import org.apache.hadoop.yarn.api.ApplicationConstants
-
-class PySparkRunnerProvider extends RunnerSetupProvider {
-
- private var conf: ClusterConfig = _
- private val libPath = System.getProperty("java.library.path")
-
- override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match {
- case "mesos" =>
- s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " +
- s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " +
- s"-Dscala.usejavacp=true -Djava.library.path=$libPath org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}.stripMargin"
- case "yarn" => "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && " +
- s"/bin/bash ${StringUtils.stripStart(conf.spark.home,"/")}/conf/load-spark.sh && " +
- s"java -cp ${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/ " +
- "-Xmx2G " +
- "-Dscala.usejavacp=true " +
- "-Dhdp.version=2.6.5.0-292 " +
- "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
- s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
- s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
- s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
- case _ => ""
- }
+import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
- import org.apache.commons.lang.StringUtils
- override def getRunnerResources: Array[String] =
- Array[String]("miniconda.sh", "spark_intp.py", "runtime.py", "codegen.py")
+class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) {
- override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] =
- Array[String]()
+ override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = {
- //val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
++ val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
+ log.info(s"===> Cluster manager: ${conf.mode}")
- //command +
- // s"$$SPARK_HOME/conf/spark-env.sh && env PYSPARK_PYTHON=$getVirtualPythonPath " +
- s"$$SPARK_HOME/bin/spark-submit ${actionData.getSrc}"
++ command +
++ //s" $$SPARK_HOME/conf/spark-env.sh &&" +
++ s" && env PYSPARK_PYTHON=$getVirtualPythonPath" +
++ s" $$SPARK_HOME/bin/spark-submit ${actionData.getSrc}"
+ }
+
+ override def getRunnerResources: Array[String] = {
+ var resources = super.getRunnerResources
+ resources = resources :+ s"amaterasu_pyspark-${conf.version}.zip"
+ log.info(s"PYSPARK RESOURCES ==> ${resources.toSet}")
+ resources
+ }
- override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
- Array[String]()
- override def getHasExecutor: Boolean = true
+ override def getHasExecutor: Boolean = false
+ override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]()
}
object PySparkRunnerProvider {
diff --cc frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
index 9855261,0000000..c8e66c6
mode 100644,000000..100644
Binary files differ
diff --cc leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index cd4bf03,3e8a7b6..77c17d1
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@@ -249,8 -243,6 +248,8 @@@ class ApplicationMaster : KLogging(), A
jobManager.actionStarted(actionData.id)
containersIdsToTask[container.id.containerId] = actionData
+ notifier.info("created container for ${actionData.name} created")
- //ctx.localResources.forEach { t: String, u: LocalResource -> notifier.info("resource: $t = ${u.resource}") }
++ ctx.localResources.forEach { t: String, u: LocalResource -> notifier.info("resource: $t = ${u.resource}") }
log.info("launching container succeeded: ${container.id.containerId}; task: ${actionData.id}")
}
}
diff --cc leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index a87ea5d,464e3bf..f9b1060
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@@ -17,8 -17,9 +17,9 @@@
package org.apache.amaterasu.leader.mesos.schedulers
import java.io.{File, PrintWriter, StringWriter}
+ import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.util
-import java.util.UUID
+import java.util.{Collections, UUID}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
@@@ -247,9 -248,33 +252,28 @@@ class JobScheduler extends AmaterasuSch
.setExtract(false)
.build()))
+ // setting up action executable
+ val sourcePath = new File(runnerProvider.getActionExecutable(jobManager.getJobId, actionData))
+ var executable: Path = null
+ if (actionData.getHasArtifact) {
+ val relativePath = amaDist.toPath.getRoot.relativize(sourcePath.toPath)
+ executable = relativePath.subpath(amaDist.toPath.getNameCount, relativePath.getNameCount)
+ } else {
+ val dest = new File(s"dist/${jobManager.getJobId}/${sourcePath.toString}")
+ FileUtils.moveFile(sourcePath, dest)
+ executable = Paths.get(jobManager.getJobId, sourcePath.toPath.toString)
+ }
+
+ println(s"===> executable $executable")
+ command.addUris(URI.newBuilder
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$executable")
++ .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$executable")
+ .setExecutable(false)
+ .setExtract(false)
+ .build())
+
command
.addUris(URI.newBuilder()
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side
- .setExecutable(true)
- .setExtract(false)
- .build())
- .addUris(URI.newBuilder()
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
+ .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/amaterasu.properties")
.setExecutable(false)
.setExtract(false)
.build())
diff --cc sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
index 57736cc,9af488e..eb75b2d
--- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
+++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
@@@ -17,10 -17,10 +17,12 @@@
package org.apache.amaterasu.sdk.frameworks
import org.apache.amaterasu.common.dataobjects.ActionData
+ import org.apache.amaterasu.common.utils.ArtifactUtil
+ import org.apache.amaterasu.common.utils.FileUtil
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.common.logging.Logging
-abstract class RunnerSetupProvider {
+abstract class RunnerSetupProvider : Logging() {
private val actionFiles = arrayOf("env.yaml", "runtime.yaml", "datastores.yaml")
diff --cc sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
index 7ba06f4,0000000..9ce27b8
mode 100644,000000..100644
Binary files differ