You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@amaterasu.apache.org by GitBox <gi...@apache.org> on 2018/08/05 09:38:28 UTC

[GitHub] roadan closed pull request #32: Amaterasu 35

roadan closed pull request #32: Amaterasu 35
URL: https://github.com/apache/incubator-amaterasu/pull/32
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
index 5e26e45..ac442d5 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
@@ -19,50 +19,50 @@ package org.apache.amaterasu.frameworks.spark.dispatcher
 import java.io.File
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.{PySparkRunnerProvider, SparkScalaRunnerProvider}
 import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser}
-import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
 import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
-
-import scala.collection.mutable
+import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider}
 
 import scala.collection.mutable
 
 class SparkSetupProvider extends FrameworkSetupProvider {
 
-
   private var env: String = _
   private var conf: ClusterConfig = _
-  private val runnersResources = mutable.Map[String, Array[File]]()
-  //private var execData: ExecData = _
   private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig
 
+  private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = mutable.Map[String, RunnerSetupProvider]()
+
   private def loadSparkConfig: mutable.Map[String, Any] = {
+
     val execData = DataLoader.getExecutorData(env, conf)
-    val sparkExecConfigurationsurations = execData.configurations.get("spark")
-    if (sparkExecConfigurationsurations.isEmpty) {
-      throw new Exception(s"Spark configuration files could not be loaded for the environment ${env}")
+    val sparkExecConfiguration = execData.configurations.get("spark")
+    if (sparkExecConfiguration.isEmpty) {
+      throw new Exception(s"Spark configuration files could not be loaded for the environment $env")
     }
-    collection.mutable.Map(sparkExecConfigurationsurations.get.toSeq: _*)
+    collection.mutable.Map(sparkExecConfiguration.get.toSeq: _*)
+
   }
 
   override def init(env: String, conf: ClusterConfig): Unit = {
     this.env = env
     this.conf = conf
 
-    runnersResources += "scala" -> Array.empty[File]
-    runnersResources += "sql" -> Array.empty[File]
-    //TODO: Nadav needs to setup conda here
-    runnersResources += "python" -> Array.empty[File]
+    runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf))
+    runnerProviders += ("pyspark" -> PySparkRunnerProvider(conf))
+
   }
 
   override def getGroupIdentifier: String = "spark"
 
   override def getGroupResources: Array[File] = {
-    new File(conf.spark.home).listFiles
-  }
 
-  override def getRunnerResources(runnerId: String): Array[File] = {
-    runnersResources(runnerId)
+    conf.mode match {
+      case "mesos" => Array[File](new File(s"spark-${conf.Webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"))
+      case "yarn" => new File(conf.spark.home).listFiles
+      case _ => Array[File]()
+    }
   }
 
   override def getDriverConfiguration: DriverConfiguration = {
@@ -99,4 +99,8 @@ class SparkSetupProvider extends FrameworkSetupProvider {
 
     new DriverConfiguration(mem, cpu)
   }
+
+  override def getRunnerProvider(runnerId: String): RunnerSetupProvider = {
+    runnerProviders(runnerId)
+  }
 }
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
new file mode 100644
index 0000000..9d5405e
--- /dev/null
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@ -0,0 +1,46 @@
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+
+import java.net.URLEncoder
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.utilities.DataLoader
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.hadoop.yarn.api.ApplicationConstants
+
+class PySparkRunnerProvider extends RunnerSetupProvider {
+
+  private var conf: ClusterConfig = _
+  private val libPath = System.getProperty("java.library.path")
+
+  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match {
+    case "mesos" =>
+      s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " +
+      s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " +
+      s"-Dscala.usejavacp=true -Djava.library.path=$libPath org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}.stripMargin"
+    case "yarn" => "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && " +
+      s"/bin/bash spark/bin/load-spark-env.sh && " +
+      s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " +
+      "-Xmx2G " +
+      "-Dscala.usejavacp=true " +
+      "-Dhdp.version=2.6.1.0-129 " +
+      "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
+      s"'$jobId' '${conf.master}' '${actionData.name}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
+      s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
+      s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
+    case _ => ""
+  }
+
+  override def getRunnerResources: Array[String] = {
+    Array[String]("miniconda.sh", "spark_intp.py", "runtime.py", "codegen.py")
+  }
+
+}
+
+object PySparkRunnerProvider {
+  def apply(conf: ClusterConfig): PySparkRunnerProvider = {
+    val result = new PySparkRunnerProvider
+    result.conf = conf
+    result
+  }
+}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
new file mode 100644
index 0000000..c92a784
--- /dev/null
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+
+import java.net.URLEncoder
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.utilities.DataLoader
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.hadoop.yarn.api.ApplicationConstants
+
+class SparkScalaRunnerProvider extends RunnerSetupProvider {
+
+  private var conf: ClusterConfig = _
+  private val libPath = System.getProperty("java.library.path")
+
+  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match {
+    case "mesos" =>
+      s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " +
+      s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " +
+      s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
+      s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}".stripMargin
+    case "yarn" => s"/bin/bash spark/bin/load-spark-env.sh && " +
+      s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " +
+      "-Xmx2G " +
+      "-Dscala.usejavacp=true " +
+      "-Dhdp.version=2.6.1.0-129 " +
+      "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
+      s"'$jobId' '${conf.master}' '${actionData.name}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
+      s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
+      s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
+    case _ => ""
+  }
+
+  override def getRunnerResources: Array[String] = {
+    Array[String]()
+  }
+
+}
+
+object SparkScalaRunnerProvider {
+  def apply(conf: ClusterConfig): SparkScalaRunnerProvider = {
+    val result = new SparkScalaRunnerProvider
+    result.conf = conf
+    result
+  }
+}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 1bb82ff..a6c8306 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -154,9 +154,12 @@ class JobScheduler extends AmaterasuScheduler {
             val slaveActions = executionMap(offer.getSlaveId.toString)
             slaveActions.put(taskId.getValue, ActionStatus.started)
 
+            val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+            val frameworkProvider = frameworkFactory.providers(actionData.groupId)
+            val runnerProvider = frameworkProvider.getRunnerProvider(actionData.typeId)
+
             // searching for an executor that already exist on the slave, if non exist
             // we create a new one
-            //TODO: move to .getOrElseUpdate when migrting to scala 2.11
             var executor: ExecutorInfo = null
             val slaveId = offer.getSlaveId.getValue
             slavesExecutors.synchronized {
@@ -166,52 +169,37 @@ class JobScheduler extends AmaterasuScheduler {
               }
               else {
                 val execData = DataLoader.getExecutorDataBytes(env, config)
+                val executorId = taskId.getValue + "-" + UUID.randomUUID()
+                //creating the command
 
+                println(s"===> ${runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")}")
                 val command = CommandInfo
                   .newBuilder
-                  .setValue(
-                    s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz java -cp executor-${config.version}-all.jar:spark-runner-${config.version}-all.jar:spark-runtime-${config.version}.jar:spark-${config.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor ${jobManager.jobId} ${config.master} ${actionData.name}""".stripMargin
-                  )
-                  //                  HttpServer.getFilesInDirectory(sys.env("AMA_NODE"), config.Webserver.Port).foreach(f=>
-                  //                  )
+                  .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, ""))
                   .addUris(URI.newBuilder
-                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
-                  .setExecutable(false)
-                  .setExtract(false)
-                  .build())
-                  .addUris(URI.newBuilder
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-runner-${config.version}-all.jar")
+                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
                     .setExecutable(false)
                     .setExtract(false)
                     .build())
-                  .addUris(URI.newBuilder
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-runtime-${config.version}.jar")
-                    .setExecutable(false)
-                    .setExtract(false)
-                    .build())
-                  .addUris(URI.newBuilder()
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz")
-                    .setExecutable(false)
-                    .setExtract(true)
-                    .build())
+
+                // Getting framework resources
+                frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}")
+                  .setExecutable(false)
+                  .setExtract(true)
+                  .build()))
+
+                // Getting running resources
+                runnerProvider.getRunnerResources.foreach(r => command.addUris(URI.newBuilder
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r")
+                  .setExecutable(false)
+                  .setExtract(false)
+                  .build()))
+
+                command
                   .addUris(URI.newBuilder()
                     .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh")
-                    .setExecutable(false)
-                    .setExtract(false)
-                    .build())
-                  .addUris(URI.newBuilder()
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark_intp.py")
-                    .setExecutable(false)
-                    .setExtract(false)
-                    .build())
-                  .addUris(URI.newBuilder()
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/runtime.py")
-                    .setExecutable(false)
-                    .setExtract(false)
-                    .build())
-                  .addUris(URI.newBuilder()
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/codegen.py")
-                    .setExecutable(false)
+                    .setExecutable(true)
                     .setExtract(false)
                     .build())
                   .addUris(URI.newBuilder()
@@ -223,7 +211,7 @@ class JobScheduler extends AmaterasuScheduler {
                   .newBuilder
                   .setData(ByteString.copyFrom(execData))
                   .setName(taskId.getValue)
-                  .setExecutorId(ExecutorID.newBuilder().setValue(taskId.getValue + "-" + UUID.randomUUID()))
+                  .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
                   .setCommand(command)
                   .build()
 
@@ -231,8 +219,6 @@ class JobScheduler extends AmaterasuScheduler {
               }
             }
 
-            val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
-            val frameworkProvider = frameworkFactory.providers(actionData.groupId)
             val driverConfiguration = frameworkProvider.getDriverConfiguration
 
             val actionTask = TaskInfo
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index 1f1aa25..e28d99f 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -17,7 +17,7 @@
 package org.apache.amaterasu.leader.yarn
 
 import java.io.{File, FileInputStream, InputStream}
-import java.net.{InetAddress, ServerSocket, URLEncoder}
+import java.net.{InetAddress, ServerSocket}
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
@@ -28,7 +28,6 @@ import org.apache.activemq.broker.BrokerService
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.common.utilities.DataLoader
 import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory
 import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
 import org.apache.amaterasu.leader.utilities.{ActiveReportListener, Args}
@@ -38,7 +37,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
@@ -244,22 +242,11 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
       val actionData = actionsBuffer.poll()
       val containerTask = Future[ActionData] {
 
-        val taskData = DataLoader.getTaskDataString(actionData, env)
-        val execData = DataLoader.getExecutorDataString(env, config)
-
+        val frameworkFactory = FrameworkProvidersFactory(env, config)
+        val framework = frameworkFactory.getFramework(actionData.groupId)
+        val runnerProvider = framework.getRunnerProvider(actionData.typeId)
         val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-        val commands: List[String] = List(
-          "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ",
-          s"/bin/bash spark/bin/load-spark-env.sh && ",
-          s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${config.YARN.hadoopHomeDir}/conf/ " +
-            "-Xmx2G " +
-            "-Dscala.usejavacp=true " +
-            "-Dhdp.version=2.6.1.0-129 " +
-            "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
-            s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(taskData, "UTF-8")}' '${URLEncoder.encode(execData, "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}' '$address' " +
-            s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
-            s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
-        )
+        val commands: List[String] = List(runnerProvider.getCommand(jobManager.jobId, actionData, env, s"${actionData.id}-${container.getId.getContainerId}", address))
 
         log.info("Running container id {}.", container.getId.getContainerId)
         log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
@@ -292,9 +279,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
           "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))),
           "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py"))))
 
-        val frameworkFactory = FrameworkProvidersFactory(env, config)
-        val framework = frameworkFactory.getFramework(actionData.groupId)
-
         //adding the framework and executor resources
         setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
         setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}")
diff --git a/leader/src/main/scripts/ama-start-mesos.sh b/leader/src/main/scripts/ama-start-mesos.sh
index 4a1f164..0047b38 100755
--- a/leader/src/main/scripts/ama-start-mesos.sh
+++ b/leader/src/main/scripts/ama-start-mesos.sh
@@ -94,8 +94,13 @@ case $i in
 esac
 done
 
+CP=""
+for filename in $BASEDIR/bin/*; do
+    CP+=$filename":"
+done
+
 echo "repo: ${REPO} "
-CMD="java -cp ${BASEDIR}/bin/*-all.jar -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.MesosJobLauncher --home ${BASEDIR}"
+CMD="java -cp ${CP} -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.MesosJobLauncher --home ${BASEDIR}"
 
 if [ -n "$REPO" ]; then
     CMD+=" --repo ${REPO}"
@@ -124,7 +129,7 @@ fi
 if ! ls ${BASEDIR}/dist/spark*.tgz 1> /dev/null 2>&1; then
     echo "${bold} Fetching spark distributable ${NC}"
     #wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
-    wget http://apache.mirror.digitalpacific.com.au/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
+    wget https://archive.apache.org/dist/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
 fi
 if [ ! -f ${BASEDIR}/dist/miniconda.sh ]; then
     echo "${bold}Fetching miniconda distributable ${NC}"
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
index ef53fa9..07a28b1 100644
--- a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
@@ -29,8 +29,8 @@
 
     File[] getGroupResources();
 
-    File[] getRunnerResources(String runnerId);
-
     DriverConfiguration getDriverConfiguration();
 
+    RunnerSetupProvider getRunnerProvider(String runnerId);
+
 }
\ No newline at end of file
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java
new file mode 100644
index 0000000..fe4086d
--- /dev/null
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.sdk.frameworks;
+
+import org.apache.amaterasu.common.dataobjects.ActionData;
+
+public interface RunnerSetupProvider {
+
+    String getCommand(String jobId, ActionData actionData, String env, String executorId, String callbackAddress);
+
+    String[] getRunnerResources();
+
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services