You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@amaterasu.apache.org by ya...@apache.org on 2019/05/06 02:40:49 UTC
[incubator-amaterasu] 33/36: mesos notifications added
This is an automated email from the ASF dual-hosted git repository.
yaniv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git
commit a28eea9b69dd3085b5a87aaafdc597806ec4dcb0
Author: Yaniv Rodenski <ya...@shinto.io>
AuthorDate: Sun May 5 10:49:23 2019 +1000
mesos notifications added
---
.../dist/amaterasu_pandas-0.2.0-incubating-rc4.zip | Bin 8304 -> 8304 bytes
.../dist/amaterasu_python-0.2.0-incubating-rc4.zip | Bin 6167 -> 6167 bytes
.../spark/dispatcher/SparkSetupProvider.kt | 16 +++
.../providers/SparkSubmitScalaRunnerProvider.kt | 16 +++
.../spark/dispatcher/SparkSetupProvider.scala | 117 ---------------------
.../runners/providers/PySparkRunnerProvider.scala | 45 --------
.../providers/SparkScalaRunnerProvider.scala | 69 ------------
.../providers/SparkSubmitScalaRunnerProvider.scala | 50 ---------
.../amaterasu_pyspark-0.2.0-incubating-rc4.zip | Bin 14488 -> 14488 bytes
.../leader/mesos/schedulers/JobScheduler.scala | 24 ++++-
.../dist/amaterasu-sdk-0.2.0-incubating-rc4.zip | Bin 15020 -> 15020 bytes
11 files changed, 52 insertions(+), 285 deletions(-)
diff --git a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
index c625dd1..3534e3d 100644
Binary files a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip and b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip differ
diff --git a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
index 8433083..379f066 100644
Binary files a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip and b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip differ
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
index 63a4234..007dcf0 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.frameworks.spark.dispatcher
import org.apache.amaterasu.common.configuration.ClusterConfig
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
index a07747a..f13e7c9 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
import com.uchuhimo.konf.Config
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
deleted file mode 100644
index 76d0aa8..0000000
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.amaterasu.frameworks.spark.dispatcher
-//
-//import java.io.File
-//import java.util
-//
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._
-//import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser}
-//import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
-//import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider}
-//import org.apache.commons.lang.StringUtils
-//
-//import scala.collection.mutable
-//import collection.JavaConversions._
-//
-//class SparkSetupProvider extends FrameworkSetupProvider {
-//
-// private var env: String = _
-// private var conf: ClusterConfig = _
-// private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig
-//
-// private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = mutable.Map[String, RunnerSetupProvider]()
-//
-// private def loadSparkConfig: mutable.Map[String, Any] = {
-//
-// val execData = DataLoader.getExecutorData(env, conf)
-// val sparkExecConfiguration = execData.getConfigurations.get("spark")
-// if (sparkExecConfiguration.isEmpty) {
-// throw new Exception(s"Spark configuration files could not be loaded for the environment $env")
-// }
-// collection.mutable.Map(sparkExecConfiguration.toSeq: _*)
-//
-// }
-//
-// override def init(env: String, conf: ClusterConfig): Unit = {
-// this.env = env
-// this.conf = conf
-//
-//// this.sparkExecConfigurations = loadSparkConfig
-// runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf))
-// runnerProviders += ("jar" -> SparkSubmitScalaRunnerProvider(conf))
-// runnerProviders += ("pyspark" -> new PySparkRunnerProvider(env, conf))
-//
-// }
-//
-// override def getGroupIdentifier: String = "spark"
-//
-// override def getGroupResources: Array[File] = conf.mode match {
-// case "mesos" => Array[File](new File(s"spark-${conf.webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"))
-// case "yarn" => Array[File](new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"), new File(s"executor-${conf.version}-all.jar"), new File(conf.spark.home))
-// case _ => Array[File]()
-// }
-//
-//
-// override def getEnvironmentVariables: util.Map[String, String] = conf.mode match {
-// case "mesos" => Map[String, String]("SPARK_HOME" -> s"spark-${conf.webserver.sparkVersion}", "SPARK_HOME_DOCKER" -> "/opt/spark/")
-// case "yarn" => Map[String, String]("SPARK_HOME" -> StringUtils.stripStart(conf.spark.home, "/"))
-// case _ => Map[String, String]()
-// }
-//
-// override def getDriverConfiguration: DriverConfiguration = {
-// var cpu: Int = 0
-// if (sparkExecConfigurations.get("spark.yarn.am.cores").isDefined) {
-// cpu = sparkExecConfigurations("spark.yarn.am.cores").toString.toInt
-// } else if (sparkExecConfigurations.get("spark.driver.cores").isDefined) {
-// cpu = sparkExecConfigurations("spark.driver.cores").toString.toInt
-// } else if (conf.spark.opts.contains("yarn.am.cores")) {
-// cpu = conf.spark.opts("yarn.am.cores").toInt
-// } else if (conf.spark.opts.contains("driver.cores")) {
-// cpu = conf.spark.opts("driver.cores").toInt
-// } else if (conf.yarn.Worker.cores > 0) {
-// cpu = conf.yarn.Worker.cores
-// } else {
-// cpu = 1
-// }
-// var mem: Int = 0
-// if (sparkExecConfigurations.get("spark.yarn.am.memory").isDefined) {
-// mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.yarn.am.memory").toString)
-// } else if (sparkExecConfigurations.get("spark.driver.memeory").isDefined) {
-// mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.driver.memeory").toString)
-// } else if (conf.spark.opts.contains("yarn.am.memory")) {
-// mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("yarn.am.memory"))
-// } else if (conf.spark.opts.contains("driver.memory")) {
-// mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory"))
-// } else if (conf.yarn.Worker.memoryMB > 0) {
-// mem = conf.yarn.Worker.memoryMB
-// } else if (conf.taskMem > 0) {
-// mem = conf.taskMem
-// } else {
-// mem = 1024
-// }
-//
-// new DriverConfiguration(mem, cpu)
-// }
-//
-// override def getRunnerProvider(runnerId: String): RunnerSetupProvider = {
-// runnerProviders(runnerId)
-// }
-//
-// override def getConfigurationItems = Array("sparkConfiguration", "sparkExecutor")
-//}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
deleted file mode 100644
index db1e3fc..0000000
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-//
-//import com.uchuhimo.konf.Config
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.common.dataobjects.ActionData
-//import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
-//import org.apache.amaterasu.leader.common.configuration.Job
-//
-//class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) {
-//
-//
-//
-// override def getRunnerResources: Array[String] = {
-// var resources = super.getRunnerResources
-// resources = resources :+ s"amaterasu_pyspark-${conf.version}.zip"
-// log.info(s"PYSPARK RESOURCES ==> ${resources.toSet}")
-// resources
-// }
-//
-//
-// override def getHasExecutor: Boolean = false
-//
-// override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]()
-//
-// override def getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String = {
-//
-// val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
-// log.info(s"===> Cluster manager: ${conf.mode}")
-// command +
-// //s" $$SPARK_HOME/conf/spark-env.sh" +
-// // s" && env PYSPARK_PYTHON=$getVirtualPythonPath" +
-// //s" env PYSPARK_DRIVER_PYTHON=$getVirtualPythonPath" + d
-// s" && $$SPARK_HOME/bin/spark-submit --master ${env[Job.master]} " +
-// s"--conf spark.pyspark.python=${conf.pythonPath} " +
-// s"--conf spark.pyspark.driver.python=$getVirtualPythonPath " +
-// s"--files $$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}"
-// }
-//}
-//
-//object PySparkRunnerProvider {
-// def apply(env: String, conf: ClusterConfig): PySparkRunnerProvider = {
-// val result = new PySparkRunnerProvider(env, conf)
-// result
-// }
-//}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
deleted file mode 100644
index 40f9194..0000000
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-//
-//import java.net.URLEncoder
-//
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.common.dataobjects.ActionData
-//import org.apache.amaterasu.leader.common.utilities.DataLoader
-//import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
-//import org.apache.commons.lang.StringUtils
-//import org.apache.hadoop.yarn.api.ApplicationConstants
-//
-//class SparkScalaRunnerProvider extends RunnerSetupProvider {
-//
-// private var conf: ClusterConfig = _
-// private val libPath = System.getProperty("java.library.path")
-//
-// override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match {
-// case "mesos" =>
-// s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=${conf.mesos.libPath} env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.webserver.Port}/dist/spark-${conf.webserver.sparkVersion}.tgz " +
-// s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.webserver.sparkVersion}/jars/* " +
-// s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
-// s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}".stripMargin
-// case "yarn" =>
-// s"/bin/bash ${StringUtils.stripStart(conf.spark.home,"/")}/conf/spark-env.sh && " +
-// s"java -cp ${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/ " +
-// "-Xmx2G " +
-// "-Dscala.usejavacp=true " +
-// "-Dhdp.version=2.6.5.0-292 " +
-// "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
-// s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
-// s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
-// s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
-// case _ => ""
-// }
-//
-// override def getRunnerResources: Array[String] =
-// Array[String]()
-//
-// override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
-// Array[String]()
-//
-// override def getHasExecutor: Boolean = true
-//
-// override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]()
-//}
-//
-//object SparkScalaRunnerProvider {
-// def apply(conf: ClusterConfig): SparkScalaRunnerProvider = {
-// val result = new SparkScalaRunnerProvider
-// result.conf = conf
-// result
-// }
-//}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
deleted file mode 100644
index b8e9677..0000000
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-//
-//import java.io.File
-//
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.common.dataobjects.ActionData
-//import org.apache.amaterasu.common.utils.ArtifactUtil
-//import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
-//
-//import scala.collection.JavaConverters._
-//
-//class SparkSubmitScalaRunnerProvider extends RunnerSetupProvider {
-//
-// private var conf: ClusterConfig = _
-// val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
-// val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
-// val amalocation = new File(s"${new File(jarFile.getParent).getParent}")
-//
-// override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = {
-//
-// val util = new ArtifactUtil(List(actionData.repo).asJava, jobId)
-// val classParam = if (actionData.getHasArtifact) s" --class ${actionData.entryClass}" else ""
-// s"$$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.getArtifact).get(0).getName} --deploy-mode client --jars spark-runtime-${conf.version}.jar >&1"
-//
-// }
-//
-// override def getRunnerResources: Array[String] =
-// Array[String]()
-//
-// override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] =
-// Array[String]()
-//
-//
-// override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
-// Array[String]()
-//
-//
-// override def getHasExecutor: Boolean = false
-//
-//
-//}
-//
-//object SparkSubmitScalaRunnerProvider {
-// def apply(conf: ClusterConfig): SparkSubmitScalaRunnerProvider = {
-// val result = new SparkSubmitScalaRunnerProvider
-//
-// result.conf = conf
-// result
-// }
-//}
\ No newline at end of file
diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
index 8d56b70..7c53be8 100644
Binary files a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip and b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip differ
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 293b878..f6d09f2 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -85,6 +85,7 @@ class JobScheduler extends AmaterasuScheduler {
private val executionMap: concurrent.Map[String, concurrent.Map[String, ActionStatus]] = new ConcurrentHashMap[String, concurrent.Map[String, ActionStatus]].asScala
private val lock = new ReentrantLock()
private val offersToTaskIds: concurrent.Map[String, String] = new ConcurrentHashMap[String, String].asScala
+ private val taskIdsToActions: concurrent.Map[Protos.TaskID, String] = new ConcurrentHashMap[Protos.TaskID, String].asScala
private val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
@@ -118,14 +119,26 @@ class JobScheduler extends AmaterasuScheduler {
def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
+ val actionName = taskIdsToActions(status.getTaskId)
status.getState match {
case TaskState.TASK_STARTING => log.info("Task starting ...")
- case TaskState.TASK_RUNNING => jobManager.actionStarted(status.getTaskId.getValue)
- case TaskState.TASK_FINISHED => jobManager.actionComplete(status.getTaskId.getValue)
+ case TaskState.TASK_RUNNING => {
+ jobManager.actionStarted(status.getTaskId.getValue)
+ printNotification(new Notification("", s"created container for $actionName created", NotificationType.Info, NotificationLevel.Execution))
+
+ }
+ case TaskState.TASK_FINISHED => {
+ jobManager.actionComplete(status.getTaskId.getValue)
+ printNotification(new Notification("",s"Container ${status.getExecutorId.getValue} Complete with task ${status.getTaskId.getValue} with success.", NotificationType.Info, NotificationLevel.Execution))
+ }
case TaskState.TASK_FAILED |
TaskState.TASK_KILLED |
TaskState.TASK_ERROR |
- TaskState.TASK_LOST => jobManager.actionFailed(status.getTaskId.getValue, status.getMessage) //TODO: revisit this
+ TaskState.TASK_LOST => {
+ jobManager.actionFailed(status.getTaskId.getValue, status.getMessage)
+ printNotification(new Notification("", s"error launching container with ${status.getMessage} in ${status.getData}", NotificationType.Error, NotificationLevel.Execution))
+
+ }
case _ => log.warn("WTF? just got unexpected task state: " + status.getState)
}
@@ -164,6 +177,7 @@ class JobScheduler extends AmaterasuScheduler {
val actionData = jobManager.getNextActionData
if (actionData != null) {
val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
+ taskIdsToActions.put(taskId, actionData.getName)
// setting up the configuration files for the container
val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
@@ -210,7 +224,8 @@ class JobScheduler extends AmaterasuScheduler {
// copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING)
// }
val commandStr = runnerProvider.getCommand(jobManager.getJobId, actionData, envConf, executorId, "")
- log.info(s"===> Command: $commandStr")
+ printNotification(new Notification("", s"container command $commandStr", NotificationType.Info, NotificationLevel.Execution))
+
val command = CommandInfo
.newBuilder
.setValue(commandStr)
@@ -335,6 +350,7 @@ class JobScheduler extends AmaterasuScheduler {
//driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
}
+ printNotification(new Notification("", s"requesting container fo ${actionData.getName}", NotificationType.Info, NotificationLevel.Execution))
driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
}
diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
index 2b0e997..8abd578 100644
Binary files a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip and b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip differ