You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@amaterasu.apache.org by ya...@apache.org on 2019/05/06 02:40:49 UTC

[incubator-amaterasu] 33/36: mesos notifications added

This is an automated email from the ASF dual-hosted git repository.

yaniv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git

commit a28eea9b69dd3085b5a87aaafdc597806ec4dcb0
Author: Yaniv Rodenski <ya...@shinto.io>
AuthorDate: Sun May 5 10:49:23 2019 +1000

    mesos notifications added
---
 .../dist/amaterasu_pandas-0.2.0-incubating-rc4.zip | Bin 8304 -> 8304 bytes
 .../dist/amaterasu_python-0.2.0-incubating-rc4.zip | Bin 6167 -> 6167 bytes
 .../spark/dispatcher/SparkSetupProvider.kt         |  16 +++
 .../providers/SparkSubmitScalaRunnerProvider.kt    |  16 +++
 .../spark/dispatcher/SparkSetupProvider.scala      | 117 ---------------------
 .../runners/providers/PySparkRunnerProvider.scala  |  45 --------
 .../providers/SparkScalaRunnerProvider.scala       |  69 ------------
 .../providers/SparkSubmitScalaRunnerProvider.scala |  50 ---------
 .../amaterasu_pyspark-0.2.0-incubating-rc4.zip     | Bin 14488 -> 14488 bytes
 .../leader/mesos/schedulers/JobScheduler.scala     |  24 ++++-
 .../dist/amaterasu-sdk-0.2.0-incubating-rc4.zip    | Bin 15020 -> 15020 bytes
 11 files changed, 52 insertions(+), 285 deletions(-)

diff --git a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
index c625dd1..3534e3d 100644
Binary files a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip and b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip differ
diff --git a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
index 8433083..379f066 100644
Binary files a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip and b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip differ
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
index 63a4234..007dcf0 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.frameworks.spark.dispatcher
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
index a07747a..f13e7c9 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
 
 import com.uchuhimo.konf.Config
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
deleted file mode 100644
index 76d0aa8..0000000
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *      http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.amaterasu.frameworks.spark.dispatcher
-//
-//import java.io.File
-//import java.util
-//
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._
-//import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser}
-//import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
-//import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider}
-//import org.apache.commons.lang.StringUtils
-//
-//import scala.collection.mutable
-//import collection.JavaConversions._
-//
-//class SparkSetupProvider extends FrameworkSetupProvider {
-//
-//  private var env: String = _
-//  private var conf: ClusterConfig = _
-//  private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig
-//
-//  private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = mutable.Map[String, RunnerSetupProvider]()
-//
-//  private def loadSparkConfig: mutable.Map[String, Any] = {
-//
-//    val execData = DataLoader.getExecutorData(env, conf)
-//    val sparkExecConfiguration = execData.getConfigurations.get("spark")
-//    if (sparkExecConfiguration.isEmpty) {
-//      throw new Exception(s"Spark configuration files could not be loaded for the environment $env")
-//    }
-//    collection.mutable.Map(sparkExecConfiguration.toSeq: _*)
-//
-//  }
-//
-//  override def init(env: String, conf: ClusterConfig): Unit = {
-//    this.env = env
-//    this.conf = conf
-//
-////    this.sparkExecConfigurations = loadSparkConfig
-//    runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf))
-//    runnerProviders += ("jar" -> SparkSubmitScalaRunnerProvider(conf))
-//    runnerProviders += ("pyspark" -> new PySparkRunnerProvider(env, conf))
-//
-//  }
-//
-//  override def getGroupIdentifier: String = "spark"
-//
-//  override def getGroupResources: Array[File] = conf.mode match {
-//    case "mesos" => Array[File](new File(s"spark-${conf.webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"))
-//    case "yarn" => Array[File](new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"), new File(s"executor-${conf.version}-all.jar"), new File(conf.spark.home))
-//    case _ => Array[File]()
-//  }
-//
-//
-//  override def getEnvironmentVariables: util.Map[String, String] = conf.mode match {
-//    case "mesos" => Map[String, String]("SPARK_HOME" -> s"spark-${conf.webserver.sparkVersion}", "SPARK_HOME_DOCKER" -> "/opt/spark/")
-//    case "yarn" => Map[String, String]("SPARK_HOME" -> StringUtils.stripStart(conf.spark.home, "/"))
-//    case _ => Map[String, String]()
-//  }
-//
-//  override def getDriverConfiguration: DriverConfiguration = {
-//    var cpu: Int = 0
-//    if (sparkExecConfigurations.get("spark.yarn.am.cores").isDefined) {
-//      cpu = sparkExecConfigurations("spark.yarn.am.cores").toString.toInt
-//    } else if (sparkExecConfigurations.get("spark.driver.cores").isDefined) {
-//      cpu = sparkExecConfigurations("spark.driver.cores").toString.toInt
-//    } else if (conf.spark.opts.contains("yarn.am.cores")) {
-//      cpu = conf.spark.opts("yarn.am.cores").toInt
-//    } else if (conf.spark.opts.contains("driver.cores")) {
-//      cpu = conf.spark.opts("driver.cores").toInt
-//    } else if (conf.yarn.Worker.cores > 0) {
-//      cpu = conf.yarn.Worker.cores
-//    } else {
-//      cpu = 1
-//    }
-//    var mem: Int = 0
-//    if (sparkExecConfigurations.get("spark.yarn.am.memory").isDefined) {
-//      mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.yarn.am.memory").toString)
-//    } else if (sparkExecConfigurations.get("spark.driver.memeory").isDefined) {
-//      mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.driver.memeory").toString)
-//    } else if (conf.spark.opts.contains("yarn.am.memory")) {
-//      mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("yarn.am.memory"))
-//    } else if (conf.spark.opts.contains("driver.memory")) {
-//      mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory"))
-//    } else if (conf.yarn.Worker.memoryMB > 0) {
-//      mem = conf.yarn.Worker.memoryMB
-//    } else if (conf.taskMem > 0) {
-//      mem = conf.taskMem
-//    } else {
-//      mem = 1024
-//    }
-//
-//    new DriverConfiguration(mem, cpu)
-//  }
-//
-//  override def getRunnerProvider(runnerId: String): RunnerSetupProvider = {
-//    runnerProviders(runnerId)
-//  }
-//
-//  override def getConfigurationItems = Array("sparkConfiguration", "sparkExecutor")
-//}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
deleted file mode 100644
index db1e3fc..0000000
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-//
-//import com.uchuhimo.konf.Config
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.common.dataobjects.ActionData
-//import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
-//import org.apache.amaterasu.leader.common.configuration.Job
-//
-//class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) {
-//
-//
-//
-//  override def getRunnerResources: Array[String] = {
-//    var resources = super.getRunnerResources
-//    resources = resources :+ s"amaterasu_pyspark-${conf.version}.zip"
-//    log.info(s"PYSPARK RESOURCES ==> ${resources.toSet}")
-//    resources
-//  }
-//
-//
-//  override def getHasExecutor: Boolean = false
-//
-//  override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]()
-//
-//  override def getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String = {
-//
-//    val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
-//        log.info(s"===> Cluster manager: ${conf.mode}")
-//        command +
-//          //s" $$SPARK_HOME/conf/spark-env.sh" +
-//          // s" && env PYSPARK_PYTHON=$getVirtualPythonPath" +
-//          //s" env PYSPARK_DRIVER_PYTHON=$getVirtualPythonPath" + d
-//          s" && $$SPARK_HOME/bin/spark-submit --master ${env[Job.master]} " +
-//          s"--conf spark.pyspark.python=${conf.pythonPath} " +
-//          s"--conf spark.pyspark.driver.python=$getVirtualPythonPath " +
-//          s"--files $$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}"
-//  }
-//}
-//
-//object PySparkRunnerProvider {
-//  def apply(env: String, conf: ClusterConfig): PySparkRunnerProvider = {
-//    val result = new PySparkRunnerProvider(env, conf)
-//    result
-//  }
-//}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
deleted file mode 100644
index 40f9194..0000000
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *      http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-//
-//import java.net.URLEncoder
-//
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.common.dataobjects.ActionData
-//import org.apache.amaterasu.leader.common.utilities.DataLoader
-//import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
-//import org.apache.commons.lang.StringUtils
-//import org.apache.hadoop.yarn.api.ApplicationConstants
-//
-//class SparkScalaRunnerProvider extends RunnerSetupProvider {
-//
-//  private var conf: ClusterConfig = _
-//  private val libPath = System.getProperty("java.library.path")
-//
-//  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match {
-//    case "mesos" =>
-//      s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=${conf.mesos.libPath} env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.webserver.Port}/dist/spark-${conf.webserver.sparkVersion}.tgz " +
-//      s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.webserver.sparkVersion}/jars/* " +
-//      s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
-//      s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}".stripMargin
-//    case "yarn" =>
-//      s"/bin/bash ${StringUtils.stripStart(conf.spark.home,"/")}/conf/spark-env.sh && " +
-//      s"java -cp ${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/ " +
-//      "-Xmx2G " +
-//      "-Dscala.usejavacp=true " +
-//      "-Dhdp.version=2.6.5.0-292 " +
-//      "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
-//      s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
-//      s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
-//      s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
-//    case _ => ""
-//  }
-//
-//  override def getRunnerResources: Array[String] =
-//    Array[String]()
-//
-//  override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
-//    Array[String]()
-//
-//  override def getHasExecutor: Boolean = true
-//
-//  override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]()
-//}
-//
-//object SparkScalaRunnerProvider {
-//  def apply(conf: ClusterConfig): SparkScalaRunnerProvider = {
-//    val result = new SparkScalaRunnerProvider
-//    result.conf = conf
-//    result
-//  }
-//}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
deleted file mode 100644
index b8e9677..0000000
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-//
-//import java.io.File
-//
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.common.dataobjects.ActionData
-//import org.apache.amaterasu.common.utils.ArtifactUtil
-//import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
-//
-//import scala.collection.JavaConverters._
-//
-//class SparkSubmitScalaRunnerProvider extends RunnerSetupProvider {
-//
-//  private var conf: ClusterConfig = _
-//  val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
-//  val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
-//  val amalocation = new File(s"${new File(jarFile.getParent).getParent}")
-//
-//  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = {
-//
-//    val util = new ArtifactUtil(List(actionData.repo).asJava, jobId)
-//    val classParam = if (actionData.getHasArtifact) s" --class ${actionData.entryClass}" else ""
-//    s"$$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.getArtifact).get(0).getName} --deploy-mode client --jars spark-runtime-${conf.version}.jar >&1"
-//
-//  }
-//
-//  override def getRunnerResources: Array[String] =
-//    Array[String]()
-//
-//  override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] =
-//    Array[String]()
-//
-//
-//  override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
-//    Array[String]()
-//
-//
-//  override def getHasExecutor: Boolean = false
-//
-//
-//}
-//
-//object SparkSubmitScalaRunnerProvider {
-//  def apply(conf: ClusterConfig): SparkSubmitScalaRunnerProvider = {
-//    val result = new SparkSubmitScalaRunnerProvider
-//
-//    result.conf = conf
-//    result
-//  }
-//}
\ No newline at end of file
diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
index 8d56b70..7c53be8 100644
Binary files a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip and b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip differ
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 293b878..f6d09f2 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -85,6 +85,7 @@ class JobScheduler extends AmaterasuScheduler {
   private val executionMap: concurrent.Map[String, concurrent.Map[String, ActionStatus]] = new ConcurrentHashMap[String, concurrent.Map[String, ActionStatus]].asScala
   private val lock = new ReentrantLock()
   private val offersToTaskIds: concurrent.Map[String, String] = new ConcurrentHashMap[String, String].asScala
+  private val taskIdsToActions: concurrent.Map[Protos.TaskID, String] = new ConcurrentHashMap[Protos.TaskID, String].asScala
 
   private val mapper = new ObjectMapper()
   mapper.registerModule(DefaultScalaModule)
@@ -118,14 +119,26 @@ class JobScheduler extends AmaterasuScheduler {
 
   def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
 
+    val actionName = taskIdsToActions(status.getTaskId)
     status.getState match {
       case TaskState.TASK_STARTING => log.info("Task starting ...")
-      case TaskState.TASK_RUNNING => jobManager.actionStarted(status.getTaskId.getValue)
-      case TaskState.TASK_FINISHED => jobManager.actionComplete(status.getTaskId.getValue)
+      case TaskState.TASK_RUNNING => {
+        jobManager.actionStarted(status.getTaskId.getValue)
+        printNotification(new Notification("", s"created container for $actionName created", NotificationType.Info, NotificationLevel.Execution))
+
+      }
+      case TaskState.TASK_FINISHED => {
+        jobManager.actionComplete(status.getTaskId.getValue)
+        printNotification(new Notification("",s"Container ${status.getExecutorId.getValue} Complete with task ${status.getTaskId.getValue} with success.", NotificationType.Info, NotificationLevel.Execution))
+      }
       case TaskState.TASK_FAILED |
            TaskState.TASK_KILLED |
            TaskState.TASK_ERROR |
-           TaskState.TASK_LOST => jobManager.actionFailed(status.getTaskId.getValue, status.getMessage) //TODO: revisit this
+           TaskState.TASK_LOST => {
+        jobManager.actionFailed(status.getTaskId.getValue, status.getMessage)
+        printNotification(new Notification("", s"error launching container with ${status.getMessage} in ${status.getData}", NotificationType.Error, NotificationLevel.Execution))
+
+      }
       case _ => log.warn("WTF? just got unexpected task state: " + status.getState)
     }
 
@@ -164,6 +177,7 @@ class JobScheduler extends AmaterasuScheduler {
           val actionData = jobManager.getNextActionData
           if (actionData != null) {
             val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
+            taskIdsToActions.put(taskId, actionData.getName)
             // setting up the configuration files for the container
             val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
             writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
@@ -210,7 +224,8 @@ class JobScheduler extends AmaterasuScheduler {
             //                            copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING)
             //                          }
             val commandStr = runnerProvider.getCommand(jobManager.getJobId, actionData, envConf, executorId, "")
-            log.info(s"===> Command: $commandStr")
+            printNotification(new Notification("", s"container command $commandStr", NotificationType.Info, NotificationLevel.Execution))
+
             val command = CommandInfo
               .newBuilder
               .setValue(commandStr)
@@ -335,6 +350,7 @@ class JobScheduler extends AmaterasuScheduler {
               //driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
             }
 
+            printNotification(new Notification("", s"requesting container fo ${actionData.getName}", NotificationType.Info, NotificationLevel.Execution))
             driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
 
           }
diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
index 2b0e997..8abd578 100644
Binary files a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip and b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip differ