You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@amaterasu.apache.org by GitBox <gi...@apache.org> on 2018/02/13 08:39:24 UTC

[GitHub] eyalbenivri closed pull request #6: Amaterasu 15

eyalbenivri closed pull request #6: Amaterasu 15
URL: https://github.com/apache/incubator-amaterasu/pull/6
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/build.gradle b/common/build.gradle
index c63c5a3..0b6cf8a 100644
--- a/common/build.gradle
+++ b/common/build.gradle
@@ -43,6 +43,7 @@ dependencies {
     compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
     compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
     compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.9'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4'
 
     // currently we have to use this specific mesos version to prevent from
     // clashing with spark
diff --git a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala
index 6f14c2d..8a44019 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala
@@ -16,8 +16,9 @@
  */
 package org.apache.amaterasu.common.execution.actions
 
-import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel
-import org.apache.amaterasu.common.execution.actions.NotificationType.NotificationType
+import NotificationLevel.NotificationLevel
+import NotificationType.NotificationType
+import com.fasterxml.jackson.annotation.JsonProperty
 
 abstract class Notifier {
 
@@ -33,22 +34,22 @@ abstract class Notifier {
 object NotificationType extends Enumeration {
 
   type NotificationType = Value
-  val success = Value("success")
-  val error = Value("error")
-  val info = Value("info")
+  val success: NotificationType.Value = Value("success")
+  val error: NotificationType.Value = Value("error")
+  val info: NotificationType.Value = Value("info")
 
 }
 
 object NotificationLevel extends Enumeration {
 
   type NotificationLevel = Value
-  val execution = Value("execution")
-  val code = Value("code")
-  val none = Value("none")
+  val execution: NotificationLevel.Value = Value("execution")
+  val code: NotificationLevel.Value = Value("code")
+  val none: NotificationLevel.Value = Value("none")
 
 }
 
-case class Notification(line: String,
-                        msg: String,
-                        notType: NotificationType,
-                        notLevel: NotificationLevel)
+case class Notification(@JsonProperty("line") line: String,
+                        @JsonProperty("msg") msg: String,
+                        @JsonProperty("notType") notType: NotificationType,
+                        @JsonProperty("notLevel") notLevel: NotificationLevel)
diff --git a/executor/build.gradle b/executor/build.gradle
index a081759..30076d4 100644
--- a/executor/build.gradle
+++ b/executor/build.gradle
@@ -65,6 +65,8 @@ dependencies {
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.5'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.5'
     compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.5'
+    compile group: 'org.apache.activemq', name: 'activemq-client', version: '5.15.2'
+    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
 
     compile project(':common')
     compile project(':amaterasu-sdk')
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
new file mode 100644
index 0000000..2f73b71
--- /dev/null
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
@@ -0,0 +1,72 @@
+package org.apache.amaterasu.executor.common.executors
+
+import javax.jms.{DeliveryMode, MessageProducer, Session}
+
+
+import net.liftweb.json._
+import net.liftweb.json.Serialization.write
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType, Notifier}
+import org.apache.amaterasu.common.logging.Logging
+
+class ActiveNotifier extends Notifier with Logging {
+
+  var producer: MessageProducer = _
+  var session: Session = _
+
+  implicit val formats = DefaultFormats
+
+  override def info(message: String): Unit = {
+
+    log.info(message)
+
+    val notification = Notification("", message, NotificationType.info, NotificationLevel.execution)
+    val notificationJson = write(notification)
+    val msg = session.createTextMessage(notificationJson)
+    producer.send(msg)
+
+  }
+
+  override def success(line: String): Unit = {
+
+    log.info(s"successfully executed line: $line")
+
+    val notification = Notification(line, "", NotificationType.success, NotificationLevel.code)
+    val notificationJson = write(notification)
+    val msg = session.createTextMessage(notificationJson)
+    producer.send(msg)
+
+  }
+
+  override def error(line: String, message: String): Unit = {
+
+    log.error(s"Error executing line: $line message: $message")
+
+    val notification = Notification(line, message, NotificationType.error, NotificationLevel.code)
+    val notificationJson = write(notification)
+    val msg = session.createTextMessage(notificationJson)
+    producer.send(msg)
+
+  }
+}
+
+object ActiveNotifier extends Logging {
+  def apply(address: String): ActiveNotifier = {
+
+    // setting up activeMQ connection
+    val connectionFactory = new ActiveMQConnectionFactory(address)
+    val connection = connectionFactory.createConnection()
+    connection.start()
+    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val destination = session.createTopic("JOB.REPORT")
+    val producer = session.createProducer(destination)
+    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
+
+    // creating notifier
+    val notifier = new ActiveNotifier
+    notifier.session = session
+    notifier.producer = producer
+
+    notifier
+  }
+}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
index e99acfe..a091c1b 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
@@ -28,33 +28,33 @@ class MesosNotifier(driver: ExecutorDriver) extends Notifier with Logging {
   private val mapper = new ObjectMapper()
   mapper.registerModule(DefaultScalaModule)
 
-  override def success(line: String) = {
+  override def success(line: String): Unit = {
 
     log.info(s"successfully executed line: $line")
 
-    val notification = new Notification(line, "", NotificationType.success, NotificationLevel.code)
+    val notification = Notification(line, "", NotificationType.success, NotificationLevel.code)
     val msg = mapper.writeValueAsBytes(notification)
 
     driver.sendFrameworkMessage(msg)
 
   }
 
-  override def error(line: String, message: String) = {
+  override def error(line: String, message: String): Unit = {
 
     log.error(s"Error executing line: $line message: $message")
 
-    val notification = new Notification(line, message, NotificationType.error, NotificationLevel.code)
+    val notification = Notification(line, message, NotificationType.error, NotificationLevel.code)
     val msg = mapper.writeValueAsBytes(notification)
 
     driver.sendFrameworkMessage(msg)
 
   }
 
-  override def info(message: String) = {
+  override def info(message: String): Unit = {
 
     log.info(message)
 
-    val notification = new Notification("", message, NotificationType.info, NotificationLevel.execution)
+    val notification = Notification("", message, NotificationType.info, NotificationLevel.execution)
     val msg = mapper.writeValueAsBytes(notification)
 
     driver.sendFrameworkMessage(msg)
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index 05637cb..d437778 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -8,7 +8,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+import org.apache.amaterasu.executor.common.executors.{ActiveNotifier, ProvidersFactory}
 import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.spark.SparkContext
@@ -53,12 +53,6 @@ class ActionsExecutor extends Logging {
 //s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(gson.toJson(taskData), "UTF-8")}' '${URLEncoder.encode(gson.toJson(execData), "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}'"
 object ActionsExecutorLauncher extends App with Logging {
 
-  def urlses(cl: ClassLoader): Array[java.net.URL] = cl match {
-    case null => Array()
-    case u: java.net.URLClassLoader => u.getURLs() ++ urlses(cl.getParent)
-    case _ => urlses(cl.getParent)
-  }
-
   val hostName = InetAddress.getLocalHost.getHostName
 
   log.info(s"Hostname resolved to: $hostName")
@@ -67,14 +61,11 @@ object ActionsExecutorLauncher extends App with Logging {
 
   log.info("Starting actions executor")
 
-  val urls = urlses(getClass.getClassLoader)
-
-  log.info("Current classpath is:")
-  log.info(urls.mkString("\n"))
-
   val jobId = this.args(0)
   val master = this.args(1)
   val actionName = this.args(2)
+  val notificationsAddress = this.args(6)
+
   log.info("parsing task data")
   val taskData = mapper.readValue(URLDecoder.decode(this.args(3), "UTF-8"), classOf[TaskData])
   log.info("parsing executor data")
@@ -89,7 +80,7 @@ object ActionsExecutorLauncher extends App with Logging {
 
   log.info("Setup executor")
   val baos = new ByteArrayOutputStream()
-  val notifier = new YarnNotifier(new YarnConfiguration())
+  val notifier = ActiveNotifier(notificationsAddress)
 
   log.info("Setup notifier")
   actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, hostName, propFile = "./amaterasu.properties")
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
index 1fb2d85..9261080 100755
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
+++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
@@ -57,7 +57,7 @@ class SparkScalaRunner(var env: Environment,
   def interpretSources(source: Source, actionName: String, exports: Map[String, String]): Unit = {
 
     notifier.info(s"================= started action $actionName =================")
-    notifier.info(s"exports is: $exports")
+    //notifier.info(s"exports is: $exports")
 
     for (line <- source.getLines()) {
 
@@ -86,8 +86,8 @@ class SparkScalaRunner(var env: Environment,
 
               val resultName = interpreter.prevRequestList.last.termNames.last
 
-              notifier.info(s" result name ${resultName.toString}")
-              notifier.info(s" exist in exports: ${exports.contains(resultName.toString)}")
+              //notifier.info(s" result name ${resultName.toString}")
+              //notifier.info(s" exist in exports: ${exports.contains(resultName.toString)}")
 
               if (exports.contains(resultName.toString)) {
 
@@ -99,7 +99,7 @@ class SparkScalaRunner(var env: Environment,
                     case ds: Dataset[_] =>
                       log.debug(s"persisting DataFrame: $resultName")
                       val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.workingDir}/$jobId/$actionName/$resultName")"""
-                      notifier.info(writeLine)
+                      //notifier.info(writeLine)
                       val writeResult = interpreter.interpret(writeLine)
                       if (writeResult != Results.Success) {
                         val err = outStream.toString
diff --git a/leader/build.gradle b/leader/build.gradle
index 9ecd17c..429f072 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -41,12 +41,12 @@ dependencies {
 
     compile group: 'com.github.scopt', name: 'scopt_2.11', version: '3.3.0'
     compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
-    compile group: 'org.apache.curator', name: 'curator-framework', version: '2.9.1'
+    compile group: 'org.apache.curator', name:'curator-test', version:'2.9.1'
     compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.3'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.3'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.3'
+    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.3'
     compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.2.19.v20160908'
@@ -61,6 +61,9 @@ dependencies {
     compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6'
     compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
     compile group: 'org.reflections', name: 'reflections', version: '0.9.11'
+    compile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.15.3'
+    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
+    runtime group: 'org.apache.activemq', name: 'activemq-kahadb-store', version: '5.15.3'
 
     testCompile project(':common')
     testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
@@ -68,7 +71,9 @@ dependencies {
     testCompile 'junit:junit:4.11'
     testCompile 'org.scalatest:scalatest_2.11:3.0.2'
     testCompile 'org.scala-lang:scala-library:2.11.8'
-    testCompile group: 'org.apache.curator', name: 'curator-test', version: '2.9.1'
+    testCompile( 'org.apache.curator:curator-test:2.9.1'){
+        exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind'
+    }
 
 }
 
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
index e085d6e..5a8665c 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
+++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
@@ -16,17 +16,18 @@
  */
 package org.apache.amaterasu.leader.yarn;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.amaterasu.common.configuration.ClusterConfig;
 
 import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory;
+import org.apache.amaterasu.leader.utilities.ActiveReportListener;
 import org.apache.amaterasu.sdk.FrameworkSetupProvider;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -37,14 +38,23 @@
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import org.apache.log4j.LogManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.*;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.*;
 
+import static java.lang.System.exit;
+
 public class Client {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(Client.class);
@@ -63,17 +73,12 @@ private LocalResource setLocalResourceFromPath(Path path) throws IOException {
         return fileResource;
     }
 
-    public void run(JobOpts opts, String[] args) throws Exception {
+    private void run(JobOpts opts, String[] args) throws Exception {
 
+        LogManager.resetConfiguration();
         ClusterConfig config = new ClusterConfig();
         config.load(new FileInputStream(opts.home + "/amaterasu.properties"));
 
-//        conf.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
-//        conf.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"));
-//        conf.addResource(new Path("/etc/hadoop/conf/yarn-site.xml"));
-//        conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
-//        conf.set("fs.file.impl", LocalFileSystem.class.getName());
-
         // Create yarnClient
         YarnClient yarnClient = YarnClient.createYarnClient();
         yarnClient.init(conf);
@@ -85,10 +90,10 @@ public void run(JobOpts opts, String[] args) throws Exception {
             app = yarnClient.createApplication();
         } catch (YarnException e) {
             LOGGER.error("Error initializing yarn application with yarn client.", e);
-            System.exit(1);
+            exit(1);
         } catch (IOException e) {
             LOGGER.error("Error initializing yarn application with yarn client.", e);
-            System.exit(2);
+            exit(2);
         }
 
         // Setup jars on hdfs
@@ -96,7 +101,7 @@ public void run(JobOpts opts, String[] args) throws Exception {
             fs = FileSystem.get(conf);
         } catch (IOException e) {
             LOGGER.error("Eror creating HDFS client isntance.", e);
-            System.exit(3);
+            exit(3);
         }
         Path jarPath = new Path(config.YARN().hdfsJarsPath());
         Path jarPathQualified = fs.makeQualified(jarPath);
@@ -108,6 +113,7 @@ public void run(JobOpts opts, String[] args) throws Exception {
             newId = "--new-job-id " + appContext.getApplicationId().toString() + "-" + UUID.randomUUID().toString();
         }
 
+
         List<String> commands = Collections.singletonList(
                 "env AMA_NODE=" + System.getenv("AMA_NODE") + " " +
                         "$JAVA_HOME/bin/java" +
@@ -116,8 +122,8 @@ public void run(JobOpts opts, String[] args) throws Exception {
                         " org.apache.amaterasu.leader.yarn.ApplicationMaster " +
                         joinStrings(args) +
                         newId +
-                        "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout " +
-                        "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
+                        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+                        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
         );
 
 
@@ -130,6 +136,7 @@ public void run(JobOpts opts, String[] args) throws Exception {
             if (!fs.exists(jarPathQualified)) {
                 File home = new File(opts.home);
                 fs.mkdirs(jarPathQualified);
+
                 for (File f : home.listFiles()) {
                     fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), jarPathQualified);
                 }
@@ -152,7 +159,10 @@ public void run(JobOpts opts, String[] args) throws Exception {
             }
         } catch (IOException e) {
             LOGGER.error("Error uploading ama folder to HDFS.", e);
-            System.exit(3);
+            exit(3);
+        } catch (NullPointerException ne) {
+            LOGGER.error("No files in home dir.", ne);
+            exit(4);
         }
 
 
@@ -177,7 +187,7 @@ public void run(JobOpts opts, String[] args) throws Exception {
             log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")));
         } catch (IOException e) {
             LOGGER.error("Error initializing yarn local resources.", e);
-            System.exit(4);
+            exit(4);
         }
 
         // set local resource on master container
@@ -210,14 +220,29 @@ public void run(JobOpts opts, String[] args) throws Exception {
         LOGGER.info("Submitting application {}", appId);
         try {
             yarnClient.submitApplication(appContext);
+
         } catch (YarnException e) {
             LOGGER.error("Error submitting application.", e);
-            System.exit(6);
+            exit(6);
         } catch (IOException e) {
             LOGGER.error("Error submitting application.", e);
-            System.exit(7);
+            exit(7);
         }
 
+        CuratorFramework client = CuratorFrameworkFactory.newClient(config.zk(),
+                new ExponentialBackoffRetry(1000, 3));
+        client.start();
+
+        String newJobId = newId.replace("--new-job-id ", "");
+        System.out.println("===> /" + newJobId + "-report-barrier");
+        DistributedBarrier reportBarrier = new DistributedBarrier(client, "/" + newJobId + "-report-barrier");
+        reportBarrier.setBarrier();
+        reportBarrier.waitOnBarrier();
+
+        String address = new String( client.getData().forPath("/" + newJobId + "/broker"));
+        System.out.println("===> " + address);
+        setupReportListener(address);
+
         ApplicationReport appReport = null;
         YarnApplicationState appState;
 
@@ -226,13 +251,14 @@ public void run(JobOpts opts, String[] args) throws Exception {
                 appReport = yarnClient.getApplicationReport(appId);
             } catch (YarnException e) {
                 LOGGER.error("Error getting application report.", e);
-                System.exit(8);
+                exit(8);
             } catch (IOException e) {
                 LOGGER.error("Error getting application report.", e);
-                System.exit(9);
+                exit(9);
             }
             appState = appReport.getYarnApplicationState();
             if (isAppFinished(appState)) {
+                exit(0);
                 break;
             }
             //LOGGER.info("Application not finished ({})", appReport.getProgress());
@@ -240,17 +266,13 @@ public void run(JobOpts opts, String[] args) throws Exception {
                 Thread.sleep(100);
             } catch (InterruptedException e) {
                 LOGGER.error("Interrupted while waiting for job completion.", e);
-                System.exit(137);
+                exit(137);
             }
         } while (!isAppFinished(appState));
 
         LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport.getFinalApplicationStatus(), appReport.getFinishTime());
     }
 
-    private static void copyDirRecursive(){
-
-    }
-
     private boolean isAppFinished(YarnApplicationState appState) {
         return appState == YarnApplicationState.FINISHED ||
                 appState == YarnApplicationState.KILLED ||
@@ -276,16 +298,32 @@ private static String joinStrings(String[] str) {
 
     }
 
+    private void setupReportListener(String address) throws JMSException {
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(address);
+        Connection conn = cf.createConnection();
+        conn.start();
+
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        //TODO: move to a const in common
+        Topic destination = session.createTopic("JOB.REPORT");
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new ActiveReportListener());
+
+    }
+
     private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
         Apps.addToEnvironment(appMasterEnv,
                 ApplicationConstants.Environment.CLASSPATH.name(),
-                ApplicationConstants.Environment.PWD.$() + File.separator + "*");
+                ApplicationConstants.Environment.PWD.$() + File.separator + "*", File.pathSeparator);
 
         for (String c : conf.getStrings(
                 YarnConfiguration.YARN_APPLICATION_CLASSPATH,
                 YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
             Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(),
-                    c.trim());
+                    c.trim(), File.pathSeparator);
         }
     }
 }
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala
index 4f61cec..f540997 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala
@@ -27,8 +27,8 @@ trait Action extends Logging {
   var actionPath: String = _
   var actionId: String = _
 
-  var data: ActionData = null
-  var client: CuratorFramework = null
+  var data: ActionData = _
+  var client: CuratorFramework = _
 
   def execute(): Unit
 
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala
new file mode 100644
index 0000000..e24d979
--- /dev/null
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala
@@ -0,0 +1,56 @@
+package org.apache.amaterasu.leader.utilities
+
+import javax.jms.{Message, MessageListener, TextMessage}
+
+import net.liftweb.json._
+import net.liftweb.json.JsonDSL._
+import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType}
+
+class ActiveReportListener extends MessageListener {
+
+  implicit val formats = DefaultFormats
+
+  override def onMessage(message: Message): Unit = {
+    message match {
+      case tm: TextMessage =>
+        try {
+          val notification = parseNot(parse(tm.getText))
+          printNotification(notification)
+
+        } catch {
+          case e: Exception => println(e.getMessage)
+        }
+      case _ => println("===> Unknown message")
+    }
+  }
+
+  private def parseNot(json: JValue): Notification = Notification(
+    (json \ "line").asInstanceOf[JString].values,
+    (json \ "msg").asInstanceOf[JString].values,
+    NotificationType.withName((json \ "notType" \ "name").asInstanceOf[JString].values),
+    NotificationLevel.withName((json \ "notLevel" \ "name").asInstanceOf[JString].values)
+  )
+
+
+  private def printNotification(notification: Notification): Unit = {
+
+    var color = Console.WHITE
+
+    notification.notType match {
+
+      case NotificationType.info =>
+        color = Console.WHITE
+        println(s"$color${Console.BOLD}===> ${notification.msg} ${Console.RESET}")
+      case NotificationType.success =>
+        color = Console.GREEN
+        println(s"$color${Console.BOLD}===> ${notification.line} ${Console.RESET}")
+      case NotificationType.error =>
+        color = Console.RED
+        println(s"$color${Console.BOLD}===> ${notification.line} ${Console.RESET}")
+        println(s"$color${Console.BOLD}===> ${notification.msg} ${Console.RESET}")
+
+    }
+
+  }
+}
+
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index 9844d07..65efecc 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -17,21 +17,24 @@
 package org.apache.amaterasu.leader.yarn
 
 import java.io.{File, FileInputStream, InputStream}
-import java.net.URLEncoder
+import java.net.{InetAddress, ServerSocket, URLEncoder}
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+import javax.jms.Session
 
-import com.google.gson.Gson
+import org.apache.activemq.broker.BrokerService
+import org.apache.activemq.ActiveMQConnectionFactory
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory
 import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
-import org.apache.amaterasu.leader.utilities.{Args, DataLoader}
+import org.apache.amaterasu.leader.utilities.{ActiveReportListener, Args, DataLoader}
+import org.apache.curator.framework.recipes.barriers.DistributedBarrier
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants
@@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.zookeeper.CreateMode
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -54,7 +58,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
 
   var capability: Resource = _
 
-  private val MAX_ATTEMPTS_PER_TASK = 3
   log.info("ApplicationMaster start")
 
   private var jobManager: JobManager = _
@@ -67,7 +70,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
   private var propPath: String = ""
   private var props: InputStream = _
   private var jarPath: Path = _
-  private var version: String = ""
   private var executorPath: Path = _
   private var executorJar: LocalResource = _
   private var propFile: LocalResource = _
@@ -75,24 +77,30 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
   private var nmClient: NMClientAsync = _
   private var allocListener: YarnRMCallbackHandler = _
   private var rmClient: AMRMClientAsync[ContainerRequest] = _
+  private var address: String = _
 
   private val containersIdsToTask: concurrent.Map[Long, ActionData] = new ConcurrentHashMap[Long, ActionData].asScala
   private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
   private val actionsBuffer: java.util.concurrent.ConcurrentLinkedQueue[ActionData] = new java.util.concurrent.ConcurrentLinkedQueue[ActionData]()
-  private val gson: Gson = new Gson()
+  private val host: String = InetAddress.getLocalHost.getHostName
+  private val broker: BrokerService = new BrokerService()
 
   def setLocalResourceFromPath(path: Path): LocalResource = {
+
     val stat = fs.getFileStatus(path)
     val fileResource = Records.newRecord(classOf[LocalResource])
+
     fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path))
     fileResource.setSize(stat.getLen)
     fileResource.setTimestamp(stat.getModificationTime)
     fileResource.setType(LocalResourceType.FILE)
     fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
     fileResource
+
   }
 
   def execute(arguments: Args): Unit = {
+
     log.info(s"started AM with args $arguments")
 
     propPath = System.getenv("PWD") + "/amaterasu.properties"
@@ -105,13 +113,24 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
 
     config = ClusterConfig(props)
 
-
     try {
       initJob(arguments)
     } catch {
-      case e: Exception => log.error("error initielzing ", e.getMessage)
+      case e: Exception => log.error("error initializing ", e.getMessage)
     }
 
+    // now that the job was initiated, the curator client is started and we can
+    // register the broker's address
+    client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.jobId}/broker")
+    client.setData().forPath(s"/${jobManager.jobId}/broker", address.getBytes)
+
+    // once the broker is registered, we can remove the barrier so clients can connect
+    log.info(s"/${jobManager.jobId}-report-barrier")
+    val barrier = new DistributedBarrier(client, s"/${jobManager.jobId}-report-barrier")
+    barrier.removeBarrier()
+
+    setupMessaging(jobManager.jobId)
+
     log.info(s"Job ${jobManager.jobId} initiated with ${jobManager.registeredActions.size} actions")
 
     jarPath = new Path(config.YARN.hdfsJarsPath)
@@ -155,6 +174,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
     this.capability.setMemory(Math.min(config.taskMem, 1024))
     this.capability.setVirtualCores(1)
 
+
     while (!jobManager.outOfActions) {
       val actionData = jobManager.getNextActionData
       if (actionData != null) {
@@ -165,6 +185,22 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
     log.info("Finished asking for containers")
   }
 
+  private def setupMessaging(jobId: String): Unit = {
+
+    val cf = new ActiveMQConnectionFactory(address)
+    val conn = cf.createConnection()
+    conn.start()
+
+    val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    //TODO: move to a const in common
+    val destination = session.createTopic("JOB.REPORT")
+
+    val consumer = session.createConsumer(destination)
+    consumer.setMessageListener(new ActiveReportListener)
+
+  }
+
+
   private def askContainer(actionData: ActionData): Unit = {
 
     actionsBuffer.add(actionData)
@@ -217,7 +253,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
             "-Dscala.usejavacp=true " +
             "-Dhdp.version=2.6.1.0-129 " +
             "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
-            s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(taskData, "UTF-8")}' '${URLEncoder.encode(execData, "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}' " +
+            s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(taskData, "UTF-8")}' '${URLEncoder.encode(execData, "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}' '$address' " +
             s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
             s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
         )
@@ -228,7 +264,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
         ctx.setCommands(commands)
         ctx.setTokens(allTokens)
 
-        var resources = mutable.Map[String, LocalResource](
+        val resources = mutable.Map[String, LocalResource](
           "executor.jar" -> executorJar,
           "amaterasu.properties" -> propFile,
           // TODO: Nadav/Eyal all of these should move to the executor resource setup
@@ -355,6 +391,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
   }
 
   def initJob(args: Args): Unit = {
+
     this.env = args.env
     this.branch = args.branch
     try {
@@ -398,7 +435,8 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
   }
 }
 
-object ApplicationMaster extends App {
+object ApplicationMaster extends App with Logging {
+
 
   val parser = Args.getParser
   parser.parse(args, Args()) match {
@@ -406,9 +444,20 @@ object ApplicationMaster extends App {
     case Some(arguments: Args) =>
       val appMaster = new ApplicationMaster()
 
+      appMaster.address = s"tcp://${appMaster.host}:$generatePort"
+      appMaster.broker.addConnector(appMaster.address)
+      appMaster.broker.start()
+
+      log.info(s"broker started with address ${appMaster.address}")
       appMaster.execute(arguments)
 
     case None =>
   }
 
+  private def generatePort: Int = {
+    val socket = new ServerSocket(0)
+    val port = socket.getLocalPort
+    socket.close()
+    port
+  }
 }
diff --git a/leader/src/main/scripts/log4j.properties b/leader/src/main/scripts/log4j.properties
index 6b40036..c5e965f 100644
--- a/leader/src/main/scripts/log4j.properties
+++ b/leader/src/main/scripts/log4j.properties
@@ -7,4 +7,4 @@ log4j.appender.stdout.Target=System.out
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
 
-log4j.logger.org.reflections=OFF
\ No newline at end of file
+log4j.logger.reflections.Reflections=OFF
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services