You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@amaterasu.apache.org by GitBox <gi...@apache.org> on 2018/02/25 13:56:15 UTC

[GitHub] eyalbenivri closed pull request #11: Amaterasu-14

eyalbenivri closed pull request #11: Amaterasu-14
URL: https://github.com/apache/incubator-amaterasu/pull/11
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/build.gradle b/common/build.gradle
index 0b6cf8a..5a0a211 100644
--- a/common/build.gradle
+++ b/common/build.gradle
@@ -47,7 +47,7 @@ dependencies {
 
     // currently we have to use this specific mesos version to prevent from
     // clashing with spark
-    compile('org.apache.mesos:mesos:0.22.2:shaded-protobuf'){
+    compile('org.apache.mesos:mesos:0.22.2:shaded-protobuf') {
         exclude group: 'com.google.protobuf', module: 'protobuf-java'
     }
     provided group: 'org.apache.hadoop', name: 'hadoop-yarn-client', version: '2.7.3'
diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index 653c285..35a6339 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -40,7 +40,7 @@ class ClusterConfig extends Logging {
   var distLocation: String = "local"
   var workingFolder: String = ""
   // TODO: get rid of hard-coded version
-  var pysparkPath: String = "spark-2.1.1-bin-hadoop2.7/bin/spark-submit"
+  var pysparkPath: String = "spark-2.2.1-bin-hadoop2.7/bin/spark-submit"
   var Jar: String = _
   var JarName: String = _
   // the additionalClassPath is currently for testing purposes, when amaterasu is
diff --git a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala b/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
index e126241..f49d8ad 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
@@ -25,6 +25,6 @@ case class Environment() {
   var outputRootPath: String = ""
   var workingDir: String = ""
 
-  var configuration: Map[String, String] = null
+  var configuration: Map[String, String] = _
 
 }
\ No newline at end of file
diff --git a/executor/build.gradle b/executor/build.gradle
index 30076d4..a173c58 100644
--- a/executor/build.gradle
+++ b/executor/build.gradle
@@ -55,25 +55,29 @@ dependencies {
     compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
     compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
     compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8'
-
-    compile ('com.jcabi:jcabi-aether:0.10.1') {
-        exclude group: 'org.jboss.netty'
-    }
+    compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final'
+    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
     compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
     compile group: 'org.reflections', name: 'reflections', version: '0.9.10'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.5'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.5'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.5'
     compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.5'
-    compile group: 'org.apache.activemq', name: 'activemq-client', version: '5.15.2'
     compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
 
+    compile('com.jcabi:jcabi-aether:0.10.1') {
+        exclude group: 'org.jboss.netty'
+    }
+    compile('org.apache.activemq:activemq-client:5.15.2') {
+        exclude group: 'org.jboss.netty'
+    }
+
     compile project(':common')
     compile project(':amaterasu-sdk')
 
     //runtime dependency for spark
-    provided ('org.apache.spark:spark-repl_2.11:2.1.1')
-    provided ('org.apache.spark:spark-core_2.11:2.1.1')
+    provided('org.apache.spark:spark-repl_2.11:2.2.1')
+    provided('org.apache.spark:spark-core_2.11:2.2.1')
 
     testCompile project(':common')
     testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
@@ -81,7 +85,7 @@ dependencies {
     testCompile 'junit:junit:4.11'
     testCompile 'org.scalatest:scalatest_2.11:3.0.2'
     testCompile 'org.scala-lang:scala-library:2.11.8'
-    testCompile ('org.apache.spark:spark-repl_2.11:2.1.1')
+    testCompile('org.apache.spark:spark-repl_2.11:2.2.1')
     testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
 
 }
diff --git a/executor/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py
index f8536ea..f1752a2 100755
--- a/executor/src/main/resources/spark_intp.py
+++ b/executor/src/main/resources/spark_intp.py
@@ -22,7 +22,7 @@
 # sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
 # sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
 
-# py4j_path = 'spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
+# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
 # py4j_importer = zipimport.zipimporter(py4j_path)
 # py4j = py4j_importer.load_module('py4j')
 from py4j.java_gateway import JavaGateway, GatewayClient, java_import
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
index 2f73b71..0c2edf8 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.executor.common.executors
 
 import javax.jms.{DeliveryMode, MessageProducer, Session}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
index eec0106..5f78bc2 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
@@ -18,6 +18,7 @@ package org.apache.amaterasu.executor.common.executors
 
 import java.io.ByteArrayOutputStream
 
+import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ExecData
 import org.apache.amaterasu.common.execution.actions.Notifier
 import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
@@ -52,12 +53,18 @@ object ProvidersFactory {
     val result = new ProvidersFactory()
     val reflections = new Reflections(getClass.getClassLoader)
     val runnerTypes = reflections.getSubTypesOf(classOf[RunnersProvider]).toSet
+    val config = if (propFile != null) {
+      import java.io.FileInputStream
+      ClusterConfig.apply(new FileInputStream(propFile))
+    } else {
+      new ClusterConfig()
+    }
 
     result.providers = runnerTypes.map(r => {
 
       val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[RunnersProvider]
 
-      provider.init(data, jobId, outStream, notifier, executorId, propFile, hostName)
+      provider.init(data, jobId, outStream, notifier, executorId, config, hostName)
       notifier.info(s"a provider for group ${provider.getGroupIdentifier} was created")
       (provider.getGroupIdentifier, provider)
     }).toMap
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
index 7f5955e..94b8056 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
@@ -177,7 +177,7 @@ object PySparkRunner {
   private def installAnacondaOnNode(): Unit = {
     Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda")
     Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build")
-    Seq("bash", "-c", "ln -s $PWD/spark-2.1.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark")
+    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark")
   }
 
 
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
index ce3b2ba..d1c33bb 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
@@ -25,6 +25,7 @@ import org.apache.amaterasu.common.execution.actions.Notifier
 import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.PySparkRunner
+import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner
 import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
 import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner}
 import org.eclipse.aether.util.artifact.JavaScopes
@@ -52,10 +53,9 @@ class SparkRunnersProvider extends RunnersProvider with Logging {
                     outStream: ByteArrayOutputStream,
                     notifier: Notifier,
                     executorId: String,
-                    propFile: String,
+                    config: ClusterConfig,
                     hostName: String): Unit = {
 
-    val config = ClusterConfig(new FileInputStream(propFile))
     shellLoger = ProcessLogger(
       (o: String) => log.info(o),
       (e: String) => log.error("", e)
@@ -77,7 +77,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging {
     val sparkAppName = s"job_${jobId}_executor_$executorId"
 
     SparkRunnerHelper.notifier = notifier
-    val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv, propFile, hostName)
+    val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv, config, hostName)
 
     lazy val sparkScalaRunner = SparkScalaRunner(data.env, jobId, spark, outStream, notifier, jars)
     sparkScalaRunner.initializeAmaContext(data.env)
@@ -86,7 +86,10 @@ class SparkRunnersProvider extends RunnersProvider with Logging {
 
     // TODO: get rid of hard-coded version
     lazy val pySparkRunner = PySparkRunner(data.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", data.pyDeps, config)
-    runners.put(pySparkRunner.getIdentifier(), pySparkRunner)
+    runners.put(pySparkRunner.getIdentifier, pySparkRunner)
+
+    lazy val sparkSqlRunner = SparkSqlRunner(data.env, jobId, notifier, spark)
+    runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner)
   }
 
   private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
@@ -102,7 +105,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging {
     // TODO: get rid of hard-coded version
     Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") ! shellLoger
     Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") ! shellLoger
-    Seq("bash", "-c", "ln -s $PWD/spark-2.1.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger
+    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger
   }
 
   private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
index 7a7fd0d..350ddb4 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
@@ -17,23 +17,26 @@
 package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql
 
 import java.io.File
+import java.util
 
 import org.apache.amaterasu.common.execution.actions.Notifier
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
 import org.apache.amaterasu.executor.runtime.AmaContext
+import org.apache.amaterasu.sdk.AmaterasuRunner
 import org.apache.commons.io.FilenameUtils
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import scala.collection.JavaConverters._
 
 /**
   * Amaterasu currently supports JSON and PARQUET as data sources.
   * CSV data source support will be provided in later versions.
   */
-class SparkSqlRunner extends Logging {
+class SparkSqlRunner extends Logging with AmaterasuRunner {
   var env: Environment = _
   var notifier: Notifier = _
   var jobId: String = _
-  var actionName: String = _
+  //var actionName: String = _
   var spark: SparkSession = _
 
   /*
@@ -42,17 +45,19 @@ class SparkSqlRunner extends Logging {
                If not in Amaterasu format, then directly executes the query
   @Params: query string
    */
-  def executeQuery(query: String): Unit = {
+  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
 
-    notifier.info(s"================= executing the SQL query =================")
-    if (!query.isEmpty) {
+    notifier.info(s"================= started action $actionName =================")
 
-      if (query.toLowerCase.contains("amacontext")) {
+    if (!actionSource.isEmpty) {
+
+      var result: DataFrame = null
+      if (actionSource.toLowerCase.contains("amacontext")) {
 
         //Parse the incoming query
-        notifier.info(s"================= parsing the SQL query =================")
+        //notifier.info(s"================= parsing the SQL query =================")
 
-        val parser: List[String] = query.toLowerCase.split(" ").toList
+        val parser: List[String] = actionSource.toLowerCase.split(" ").toList
         var sqlPart1: String = ""
         var sqlPart2: String = ""
         var queryTempLen: Int = 0
@@ -93,26 +98,31 @@ class SparkSqlRunner extends Logging {
         val loadData: DataFrame = AmaContext.getDataFrame(actionName, dfName, fileFormat)
         loadData.createOrReplaceTempView(locationPath)
 
-        notifier.info("Executing SparkSql on: "+parsedQuery)
-        val sqlDf = spark.sql(parsedQuery)
-        //@TODO: outputFileFormat should be read from YAML file instead of input fileformat
-        writeDf(sqlDf, fileFormat, env.workingDir, jobId, actionName)
 
-        notifier.info(s"================= finished action $actionName =================")
+        try{
+
+        result = spark.sql(parsedQuery)
+        notifier.success(parsedQuery)
+        } catch {
+          case e: Exception => notifier.error(parsedQuery, e.getMessage)
+        }
+
       }
       else {
 
-        notifier.info("Executing SparkSql on: "+query)
-
-        val fildDf = spark.sql(query)
-        //@TODO: outputFileFormat should be read from YAML file instead of output fileFormat being empty
-        writeDf(fildDf, "", env.workingDir, jobId, actionName)
+        notifier.info("Executing SparkSql on: " + actionSource)
 
-        notifier.info(s"================= finished action $actionName =================")
+        result = spark.sql(actionSource)
+      }
+      val exportsBuff = exports.asScala.toBuffer
+      if (exportsBuff.nonEmpty) {
+        val exportName = exportsBuff.head._1
+        val exportFormat = exportsBuff.head._2
+        //notifier.info(s"exporting to -> ${env.workingDir}/$jobId/$actionName/$exportName")
+        result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.workingDir}/$jobId/$actionName/$exportName")
       }
+      notifier.info(s"================= finished action $actionName =================")
     }
-
-    notifier.info(s"================= finished action $actionName =================")
   }
 
   /*
@@ -128,25 +138,7 @@ class SparkSqlRunner extends Logging {
     extensions
   }
 
-  /*
-  Method to write dataframes to a specified format
-  @Params
-  df: Dataframe to be written
-  fileFormat: same as input file format
-  workingDir: temp directory
-  jobId, actionName: As specified by the user
-  */
-  def writeDf(df: DataFrame, outputFileFormat: String, workingDir: String, jobId: String, actionName: String): Unit = {
-    outputFileFormat.toLowerCase match {
-      case "parquet" => df.write.mode(SaveMode.Overwrite).parquet(s"$workingDir/$jobId/$actionName/" + actionName + "Df")
-      case "json" => df.write.mode(SaveMode.Overwrite).json(s"$workingDir/$jobId/$actionName/" + actionName + "Df")
-      case "csv" => df.write.mode(SaveMode.Overwrite).csv(s"$workingDir/$jobId/$actionName/" + actionName + "Df")
-      case "orc" => df.write.mode(SaveMode.Overwrite).orc(s"$workingDir/$jobId/$actionName/" + actionName + "Df")
-      case "text" => df.write.mode(SaveMode.Overwrite).text(s"$workingDir/$jobId/$actionName/" + actionName + "Df")
-      //case "jdbc" => df.write.mode(SaveMode.Overwrite).jdbc(s"$workingDir/$jobId/$actionName/" + actionName + "Df")
-      case _ => df.write.mode(SaveMode.Overwrite).parquet(s"$workingDir/$jobId/$actionName/" + actionName + "Df")
-    }
-  }
+  override def getIdentifier: String = "sql"
 
 }
 
@@ -154,7 +146,7 @@ object SparkSqlRunner {
 
   def apply(env: Environment,
             jobId: String,
-            actionName: String,
+            // actionName: String,
             notifier: Notifier,
             spark: SparkSession): SparkSqlRunner = {
 
@@ -162,7 +154,7 @@ object SparkSqlRunner {
 
     sparkSqlRunnerObj.env = env
     sparkSqlRunnerObj.jobId = jobId
-    sparkSqlRunnerObj.actionName = actionName
+    //sparkSqlRunnerObj.actionName = actionName
     sparkSqlRunnerObj.notifier = notifier
     sparkSqlRunnerObj.spark = spark
     sparkSqlRunnerObj
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
similarity index 95%
rename from executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala
rename to executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
index 969eb0b..9ab75be 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
@@ -33,7 +33,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 import scala.util.{Failure, Success}
 
-class ActionsExecutor extends Executor with Logging {
+class MesosActionsExecutor extends Executor with Logging {
 
   var master: String = _
   var executorDriver: ExecutorDriver = _
@@ -83,7 +83,7 @@ class ActionsExecutor extends Executor with Logging {
     notifier = new MesosNotifier(driver)
     notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered")
     val outStream = new ByteArrayOutputStream()
-    providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName)
+    providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, propFile = "./amaterasu.properties")
 
   }
 
@@ -133,13 +133,13 @@ class ActionsExecutor extends Executor with Logging {
 
 }
 
-object ActionsExecutorLauncher extends Logging {
+object MesosActionsExecutor extends Logging {
 
   def main(args: Array[String]) {
     System.loadLibrary("mesos")
     log.debug("Starting a new ActionExecutor")
 
-    val executor = new ActionsExecutor
+    val executor = new MesosActionsExecutor
     executor.jobId = args(0)
     executor.master = args(1)
     executor.actionName = args(2)
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala
index 9a17b5c..841fe42 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.executor.yarn.executors
 
 import org.apache.amaterasu.common.execution.actions.Notifier
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
index ba6a3e1..abab8a4 100644
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
+++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
@@ -26,7 +26,7 @@ import org.apache.amaterasu.common.utils.FileUtils
 import org.apache.spark.repl.amaterasu.AmaSparkILoop
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.Utils
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkConf
 
 import scala.tools.nsc.GenericRunnerSettings
 import scala.tools.nsc.interpreter.IMain
@@ -37,7 +37,6 @@ object SparkRunnerHelper extends Logging {
   private val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
   private val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
 
-  private var sparkContext: SparkContext = _
   private var sparkSession: SparkSession = _
 
   var notifier: Notifier = _
@@ -97,7 +96,7 @@ object SparkRunnerHelper extends Logging {
     }
     catch {
       case e: Exception =>
-        println("+++++++>" + new Predef.String(outStream.toByteArray))
+        println(new Predef.String(outStream.toByteArray))
 
     }
 
@@ -110,19 +109,14 @@ object SparkRunnerHelper extends Logging {
                   jars: Seq[String],
                   sparkConf: Option[Map[String, Any]],
                   executorEnv: Option[Map[String, Any]],
-                  propFile: String,
+                  config: ClusterConfig,
                   hostName: String): SparkSession = {
 
-    val config = if (propFile != null) {
-      import java.io.FileInputStream
-      ClusterConfig.apply(new FileInputStream(propFile))
-    } else {
-      new ClusterConfig()
-    }
-
     Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
-
-    val pyfiles = FileUtils.getAllFiles(new File("miniconda/pkgs")).filter(f => f.getName.endsWith(".py") ||
+    val minicondaPkgsPath = "miniconda/pkgs"
+    val executorMinicondaDirRef = new File(minicondaPkgsPath)
+    val minicondaFiles = if (executorMinicondaDirRef.exists) FileUtils.getAllFiles(executorMinicondaDirRef) else new Array[File](0)
+    val pyfiles = minicondaFiles.filter(f => f.getName.endsWith(".py") ||
       f.getName.endsWith(".egg") ||
       f.getName.endsWith(".zip"))
 
@@ -132,30 +126,36 @@ object SparkRunnerHelper extends Logging {
       .set("spark.hadoop.validateOutputSpecs", "false")
       .set("spark.logConf", "true")
       .set("spark.submit.pyFiles", pyfiles.mkString(","))
-      .setJars("executor.jar" +: jars)
 
 
+    val master: String = if (env.master.isEmpty) {
+      "yarn"
+    } else {
+      env.master
+    }
+
     config.mode match {
 
       case "mesos" =>
-        conf.set("spark.executor.uri", s"http://$getNode:${config.Webserver.Port}/spark-2.1.1-bin-hadoop2.7.tgz")
+        conf.set("spark.executor.uri", s"http://$getNode:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz")
+          .setJars(jars)
           .set("spark.master", env.master)
-          .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-2.1.1-bin-hadoop2.7")
+          .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-2.2.1-bin-hadoop2.7")
 
       case "yarn" =>
         conf.set("spark.home", config.spark.home)
           // TODO: parameterize those
+          .setJars(s"executor-${config.version}-all.jar" +: jars)
           .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab")
           .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64")
           .set("spark.yarn.queue", "default")
           .set("spark.history.kerberos.principal", "none")
 
-          .set("spark.master", "yarn")
+          .set("spark.master", master)
           .set("spark.executor.instances", "1") // TODO: change this
           .set("spark.yarn.jars", s"spark/jars/*")
           .set("spark.executor.memory", "1g")
           .set("spark.dynamicAllocation.enabled", "false")
-          //.set("spark.shuffle.service.enabled", "true")
           .set("spark.eventLog.enabled", "false")
           .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
           .set("hadoop.home.dir", config.YARN.hadoopHomeDir)
@@ -201,7 +201,6 @@ object SparkRunnerHelper extends Logging {
       //.enableHiveSupport()
       .config(conf).getOrCreate()
 
-    log.info("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
     sparkSession.conf.getAll.foreach(x => log.info(x.toString))
 
     val hc = sparkSession.sparkContext.hadoopConfiguration
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
index 9261080..a45b8c0 100755
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
+++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
@@ -99,7 +99,6 @@ class SparkScalaRunner(var env: Environment,
                     case ds: Dataset[_] =>
                       log.debug(s"persisting DataFrame: $resultName")
                       val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.workingDir}/$jobId/$actionName/$resultName")"""
-                      //notifier.info(writeLine)
                       val writeResult = interpreter.interpret(writeLine)
                       if (writeResult != Results.Success) {
                         val err = outStream.toString
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
index 0312dad..153d984 100644
--- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
+++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
@@ -23,12 +23,13 @@ import org.apache.amaterasu.utilities.TestNotifier
 import org.apache.log4j.Logger
 import org.apache.log4j.Level
 import org.apache.spark.sql.{SaveMode, SparkSession}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
+import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
+import scala.collection.JavaConverters._
 
 /**
   * Created by kirupa on 10/12/16.
   */
+@DoNotDiscover
 class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
 
   Logger.getLogger("org").setLevel(Level.OFF)
@@ -44,7 +45,7 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll
 
   override protected def beforeAll(): Unit = {
 
-    val env = new Environment()
+    val env = Environment()
     env.workingDir = "file:/tmp/"
     spark = SparkSession.builder()
       .appName("sql-job")
@@ -69,17 +70,18 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll
 
   "SparkSql" should "load data as parquet if no input foramt is specified" in {
 
-    val defaultParquetEnv = new Environment()
+    val defaultParquetEnv = Environment()
     defaultParquetEnv.workingDir = "file:/tmp/"
     AmaContext.init(spark, "sparkSqlDefaultParquetJob", defaultParquetEnv)
+
     //Prepare test dataset
     val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
     inputDf.write.mode(SaveMode.Overwrite).parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sparkSqlDefaultParquetJobAction/sparkSqlDefaultParquetJobActionTempDf")
-    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlDefaultParquetJob", "sparkSqlDefaultParquetJobAction", notifier, spark)
-    sparkSql.executeQuery("select * FROM AMACONTEXT_sparkSqlDefaultParquetJobAction_sparkSqlDefaultParquetJobActionTempDf where age=22")
-    val outputDf = spark.read.parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sparkSqlDefaultParquetJobAction/sparkSqlDefaultParquetJobActionDf")
-    println("Output Default Parquet: "+inputDf.count + "," + outputDf.first().getString(1))
-    outputDf.first().getString(1) shouldEqual("Michael")
+    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlDefaultParquetJob", notifier, spark)
+    sparkSql.executeSource("select * FROM AMACONTEXT_sparkSqlDefaultParquetJobAction_sparkSqlDefaultParquetJobActionTempDf where age=22", "sql_parquet_test", Map("result" -> "parquet").asJava)
+    val outputDf = spark.read.parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sql_parquet_test/result")
+    println("Output Default Parquet: " + inputDf.count + "," + outputDf.first().getString(1))
+    outputDf.first().getString(1) shouldEqual "Michael"
   }
 
   /*
@@ -88,17 +90,18 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll
 
   "SparkSql" should "load PARQUET data directly from previous action's dataframe and persist the Data in working directory" in {
 
-    val tempParquetEnv = new Environment()
+    val tempParquetEnv = Environment()
     tempParquetEnv.workingDir = "file:/tmp/"
     AmaContext.init(spark, "sparkSqlParquetJob", tempParquetEnv)
+
     //Prepare test dataset
     val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
     inputDf.write.mode(SaveMode.Overwrite).parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sparkSqlParquetJobAction/sparkSqlParquetJobActionTempDf")
-    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlParquetJob", "sparkSqlParquetJobAction", notifier, spark)
-    sparkSql.executeQuery("select * FROM AMACONTEXT_sparkSqlParquetJobAction_sparkSqlParquetJobActionTempDf READAS parquet")
-    val outputDf = spark.read.parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sparkSqlParquetJobAction/sparkSqlParquetJobActionDf")
-    println("Output Parquet: "+inputDf.count + "," + outputDf.count)
-    inputDf.first().getString(1) shouldEqual(outputDf.first().getString(1))
+    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlParquetJob", notifier, spark)
+    sparkSql.executeSource("select * FROM AMACONTEXT_sparkSqlParquetJobAction_sparkSqlParquetJobActionTempDf READAS parquet", "sql_parquet_test", Map("result2" -> "parquet").asJava)
+    val outputDf = spark.read.parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sql_parquet_test/result2")
+    println("Output Parquet: " + inputDf.count + "," + outputDf.count)
+    inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
   }
 
 
@@ -108,17 +111,18 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll
 
   "SparkSql" should "load JSON data directly from previous action's dataframe and persist the Data in working directory" in {
 
-    val tempJsonEnv = new Environment()
+    val tempJsonEnv = Environment()
     tempJsonEnv.workingDir = "file:/tmp/"
     AmaContext.init(spark, "sparkSqlJsonJob", tempJsonEnv)
     //Prepare test dataset
+
     val inputDf = spark.read.json(getClass.getResource("/SparkSql/json").getPath)
     inputDf.write.mode(SaveMode.Overwrite).json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sparkSqlJsonJobAction/sparkSqlJsonJobActionTempDf")
-    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlJsonJob", "sparkSqlJsonJobAction", notifier, spark)
-    sparkSql.executeQuery("select * FROM amacontext_sparkSqlJsonJobAction_sparkSqlJsonJobActionTempDf  where age='30' READAS json")
-    val outputDf = spark.read.json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sparkSqlJsonJobAction/sparkSqlJsonJobActionDf")
-    println("Output JSON: "+inputDf.count + "," + outputDf.count)
-    outputDf.first().getString(1) shouldEqual("Kirupa")
+    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlJsonJob", notifier, spark)
+    sparkSql.executeSource("select * FROM amacontext_sparkSqlJsonJobAction_sparkSqlJsonJobActionTempDf  where age='30' READAS json", "sql_json_test", Map("result" -> "json").asJava)
+    val outputDf = spark.read.json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sql_json_test/result")
+    println("Output JSON: " + inputDf.count + "," + outputDf.count)
+    outputDf.first().getString(1) shouldEqual "Kirupa"
 
   }
 
@@ -128,40 +132,42 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll
 
   "SparkSql" should "load CSV data directly from previous action's dataframe and persist the Data in working directory" in {
 
-    val tempCsvEnv = new Environment()
+    val tempCsvEnv = Environment()
     tempCsvEnv.workingDir = "file:/tmp/"
     AmaContext.init(spark, "sparkSqlCsvJob", tempCsvEnv)
+
     //Prepare test dataset
     val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath)
     inputDf.write.mode(SaveMode.Overwrite).csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sparkSqlCsvJobAction/sparkSqlCsvJobActionTempDf")
-    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlCsvJob", "sparkSqlCsvJobAction", notifier, spark)
-    sparkSql.executeQuery("select * FROM amacontext_sparkSqlCsvJobAction_sparkSqlCsvJobActionTempDf READAS csv")
-    val outputDf = spark.read.csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sparkSqlCsvJobAction/sparkSqlCsvJobActionDf")
-    println("Output CSV: "+inputDf.count + "," + outputDf.count)
-    inputDf.first().getString(1) shouldEqual(outputDf.first().getString(1))
+    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlCsvJob", notifier, spark)
+    sparkSql.executeSource("select * FROM amacontext_sparkSqlCsvJobAction_sparkSqlCsvJobActionTempDf READAS csv", "sql_csv_test", Map("result" -> "csv").asJava)
+
+    val outputDf = spark.read.csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sql_csv_test/result")
+    println("Output CSV: " + inputDf.count + "," + outputDf.count)
+    inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
   }
+
   /*
   Test whether the data can be directly read from a file and executed by sparkSql
   */
-
-  "SparkSql" should "load data directly from a file and persist the Data in working directory" in {
-
-    val tempFileEnv = new Environment()
-    tempFileEnv.workingDir = "file:/tmp/"
-    AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv)
-
-    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", "sparkSqlFileJobAction", notifier, spark)
-    sparkSql.executeQuery("SELECT * FROM parquet.`"+getClass.getResource("/SparkSql/parquet").getPath+"`")
-    val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sparkSqlFileJobAction/sparkSqlFileJobActionDf")
-    println("Output Parquet dataframe: "+ outputParquetDf.show)
-    outputParquetDf.first().getString(1) shouldEqual("Michael")
-    sparkSql.executeQuery("SELECT * FROM json.`"+getClass.getResource("/SparkSql/json").getPath+"`")
-    //@TODO: change the below read.parquet to read.outputFileFormat specified in the yaml file
-    val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sparkSqlFileJobAction/sparkSqlFileJobActionDf")
-    println("Output Json dataframe: "+ outputJsonDf.show)
-    outputJsonDf.first().getString(1) shouldEqual("Sampath")
-
-  }
+//  "SparkSql" should "load data directly from a file and persist the Data in working directory" in {
+//
+//    val tempFileEnv = Environment()
+//    tempFileEnv.workingDir = "file:/tmp/"
+//    AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv)
+//
+//    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", notifier, spark)
+//    sparkSql.executeSource("SELECT * FROM parquet.`" + getClass.getResource("/SparkSql/parquet").getPath + "`", "sql_parquet_file_test", Map("result" -> "parquet").asJava)
+//    val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
+//    println("Output Parquet dataframe: " + outputParquetDf.show)
+//    outputParquetDf.first().getString(1) shouldEqual "Michael"
+//    sparkSql.executeSource("SELECT * FROM json.`" + getClass.getResource("/SparkSql/json").getPath + "`","sql_parquet_file_test", Map("result" -> "json").asJava)
+//
+//    val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
+//    println("Output Json dataframe: " + outputJsonDf.show)
+//    outputJsonDf.first().getString(1) shouldEqual "Sampath"
+//
+//  }
 
 
 }
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
index 505ade6..49ab882 100644
--- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
+++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
@@ -28,17 +28,25 @@ import org.apache.spark.repl.amaterasu.runners.spark.SparkScalaRunner
 import org.apache.spark.sql.SparkSession
 import org.scalatest._
 
+
+
 import scala.collection.mutable.ListBuffer
 
 
 class SparkTestsSuite extends Suites(
-  new PySparkRunnerTests(),
-  new RunnersLoadingTests()) with BeforeAndAfterAll {
+  new PySparkRunnerTests,
+  new RunnersLoadingTests,
+  new SparkSqlRunnerTests) with BeforeAndAfterAll {
 
   var env: Environment = _
   var factory: ProvidersFactory = _
   var spark: SparkSession = _
 
+  private def createTestMiniconda(): Unit = {
+    println(s"PATH: ${new File(".").getAbsolutePath}")
+    new File("miniconda/pkgs").mkdirs()
+  }
+
   override def beforeAll(): Unit = {
 
     env = Environment()
@@ -48,11 +56,6 @@ class SparkTestsSuite extends Suites(
     // I can't apologise enough for this
     val resources = new File(getClass.getResource("/spark_intp.py").getPath).getParent
 
-    val conf = Map[String, Any](
-      "spark.cassandra.connection.host" -> "127.0.0.1",
-      "sourceTable" -> "documents",
-      "spark.local.ip" -> "127.0.0.1"
-    )
     env.master = "local[1]"
     if (env.configuration != null) env.configuration ++ "pysparkPath" -> "/usr/bin/python" else env.configuration = Map(
       "pysparkPath" -> "/usr/bin/python",
@@ -61,6 +64,7 @@ class SparkTestsSuite extends Suites(
     val excEnv = Map[String, Any](
       "PYTHONPATH" -> resources
     )
+    createTestMiniconda()
     env.configuration ++ "spark_exec_env" -> excEnv
     factory = ProvidersFactory(ExecData(env,
       Dependencies(ListBuffer.empty[Repo], List.empty[Artifact]),
@@ -72,7 +76,8 @@ class SparkTestsSuite extends Suites(
       new ByteArrayOutputStream(),
       new TestNotifier(),
       "test",
-      getClass.getResource("/amaterasu.properties").getPath)
+      "localhost",
+      getClass.getClassLoader.getResource("amaterasu.properties").getPath)
     spark = factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner].spark
 
     this.nestedSuites.filter(s => s.isInstanceOf[RunnersLoadingTests]).foreach(s => s.asInstanceOf[RunnersLoadingTests].factory = factory)
@@ -83,6 +88,7 @@ class SparkTestsSuite extends Suites(
   }
 
   override def afterAll(): Unit = {
+    new File("miniconda").delete()
     spark.stop()
 
     super.afterAll()
diff --git a/leader/build.gradle b/leader/build.gradle
index 429f072..8595d02 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -52,7 +52,7 @@ dependencies {
     compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.2.19.v20160908'
-    compile group: 'javax.servlet', name: 'javax.servlet-api', version: '2.5.0'
+    //compile group: 'javax.servlet', name: 'javax.servlet-api', version: '3.1.0'
     compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0'
     compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r'
     compile group: 'org.yaml', name: 'snakeyaml', version: '1.18'
@@ -71,9 +71,7 @@ dependencies {
     testCompile 'junit:junit:4.11'
     testCompile 'org.scalatest:scalatest_2.11:3.0.2'
     testCompile 'org.scala-lang:scala-library:2.11.8'
-    testCompile( 'org.apache.curator:curator-test:2.9.1'){
-        exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind'
-    }
+    testCompile 'org.apache.curator:curator-test:2.9.1'
 
 }
 
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java
index b89c263..be0fc05 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java
+++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.leader.yarn;
 
 import org.apache.commons.cli.*;
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
index eb2659d..b8c29b7 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
+++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.leader.yarn;
 
 public class JobOpts {
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
index 631564e..0706491 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
@@ -3,6 +3,7 @@ package org.apache.amaterasu.leader.mesos
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.leader.mesos.schedulers.JobScheduler
 import org.apache.amaterasu.leader.utilities.{Args, BaseJobLauncher}
+import org.apache.log4j.LogManager
 import org.apache.mesos.Protos.FrameworkID
 import org.apache.mesos.{MesosSchedulerDriver, Protos}
 
@@ -13,6 +14,8 @@ import org.apache.mesos.{MesosSchedulerDriver, Protos}
 object MesosJobLauncher extends BaseJobLauncher {
 
   override def run(arguments: Args, config: ClusterConfig, resume: Boolean): Unit = {
+    LogManager.resetConfiguration()
+
     val frameworkBuilder = Protos.FrameworkInfo.newBuilder()
       .setName(s"${arguments.name} - Amaterasu Job")
       .setFailoverTimeout(config.timeout)
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index f5b572a..ec9935c 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -33,6 +33,7 @@ import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
 import org.apache.amaterasu.leader.utilities.{DataLoader, HttpServer}
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.log4j.LogManager
 import org.apache.mesos.Protos.CommandInfo.URI
 import org.apache.mesos.Protos._
 import org.apache.mesos.protobuf.ByteString
@@ -48,6 +49,7 @@ import scala.collection.concurrent.TrieMap
   */
 class JobScheduler extends AmaterasuScheduler {
 
+  LogManager.resetConfiguration()
   private var jobManager: JobManager = _
   private var client: CuratorFramework = _
   private var config: ClusterConfig = _
@@ -163,7 +165,7 @@ class JobScheduler extends AmaterasuScheduler {
                 val command = CommandInfo
                   .newBuilder
                   .setValue(
-                    s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz java -cp executor-0.2.0-incubating-all.jar:spark-${config.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.ActionsExecutorLauncher ${jobManager.jobId} ${config.master} ${actionData.name}""".stripMargin
+                    s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz java -cp executor-0.2.0-incubating-all.jar:spark-${config.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor ${jobManager.jobId} ${config.master} ${actionData.name}""".stripMargin
                   )
 //                  HttpServer.getFilesInDirectory(sys.env("AMA_NODE"), config.Webserver.Port).foreach(f=>
 //                  )
@@ -173,7 +175,7 @@ class JobScheduler extends AmaterasuScheduler {
                     .setExtract(false)
                     .build())
                   .addUris(URI.newBuilder()
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-2.1.1-bin-hadoop2.7.tgz")
+                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz")
                     .setExecutable(false)
                     .setExtract(true)
                     .build())
@@ -197,6 +199,11 @@ class JobScheduler extends AmaterasuScheduler {
                     .setExecutable(false)
                     .setExtract(false)
                     .build())
+                  .addUris(URI.newBuilder()
+                      .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
+                      .setExecutable(false)
+                      .setExtract(false)
+                      .build())
                 executor = ExecutorInfo
                   .newBuilder
                   .setData(ByteString.copyFrom(execData))
@@ -310,6 +317,7 @@ object JobScheduler {
             report: String,
             home: String): JobScheduler = {
 
+    LogManager.resetConfiguration()
     val scheduler = new JobScheduler()
 
     HttpServer.start(config.Webserver.Port, s"$home/${config.Webserver.Root}")
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
index 24b9e90..2e01963 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
@@ -16,10 +16,9 @@
  */
 package org.apache.amaterasu.leader.utilities
 
-//import org.apache.amaterasu.Logging
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.log4j.{BasicConfigurator, Level, Logger}
-import org.eclipse.jetty.server.{Server, ServerConnector}
+import org.eclipse.jetty.server.{Handler, Server, ServerConnector}
 import org.eclipse.jetty.server.handler._
 import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
 import org.eclipse.jetty.toolchain.test.MavenTestingUtils
@@ -38,39 +37,29 @@ import scala.text.Document
   * Implementation of Jetty Web server to server Amaterasu libraries and other distribution files
   */
 object HttpServer extends Logging {
-  val logger = Logger.getLogger(HttpServer.getClass)
-  var server: Server = null
+
+  var server: Server = _
 
   def start(port: String, serverRoot: String): Unit = {
 
-    /*val threadPool = new QueuedThreadPool(Runtime.getRuntime.availableProcessors() * 16)
-    threadPool.setName("Jetty")*/
     BasicConfigurator.configure()
     initLogging()
+
     server = new Server()
     val connector = new ServerConnector(server)
     connector.setPort(port.toInt)
     server.addConnector(connector)
-    val rh0 = new ResourceHandler()
-    rh0.setDirectoriesListed(true)
-    rh0.setResourceBase(serverRoot)
-    val context0 = new ContextHandler()
-    context0.setContextPath("/*")
-    //context0.setContextPath("/")
-    //val dir0 = MavenTestingUtils.getTestResourceDir("dist")
-    //context0.setBaseResource(Resource.newResource(dir0))
-    context0.setHandler(rh0)
-    val context = new ServletContextHandler(ServletContextHandler.SESSIONS)
-    context.setResourceBase(serverRoot)
-    context.setContextPath("/")
-    context.setErrorHandler(new ErrorHandler())
-    context.setInitParameter("dirAllowed", "true")
-    context.setInitParameter("pathInfoOnly", "true")
-    context.addServlet(new ServletHolder(new DefaultServlet()), "/")
-    val contexts = new ContextHandlerCollection()
-    contexts.setHandlers(Array(context0, context))
-    server.setHandler(contexts)
+
+    val handler = new ResourceHandler()
+    handler.setDirectoriesListed(true)
+    handler.setWelcomeFiles(Array[String]("index.html"))
+    handler.setResourceBase(serverRoot)
+    val handlers = new HandlerList()
+    handlers.setHandlers(Array(handler, new DefaultHandler()))
+
+    server.setHandler(handlers)
     server.start()
+
   }
 
   def stop() {
@@ -93,11 +82,13 @@ object HttpServer extends Logging {
   Note: Should the files in URL root be fetched, provide an empty value to directory.
    */
   def getFilesInDirectory(amaNode: String, port: String, directory: String = ""): Array[String] = {
+    println("http://" + amaNode + ":" + port + "/" + directory)
     val html: BufferedSource = Source.fromURL("http://" + amaNode + ":" + port + "/" + directory)
+    println(html)
     val htmlDoc = Jsoup.parse(html.mkString)
     val htmlElement: Elements = htmlDoc.body().select("a")
     val files = htmlElement.asScala
-    val fileNames = files.map(url => url.attr("href")).filter(file => (!file.contains(".."))).map(name => name.replace("/", "")).toArray
+    val fileNames = files.map(url => url.attr("href")).filter(file => !file.contains("..")).map(name => name.replace("/", "")).toArray
     fileNames
   }
 }
\ No newline at end of file
diff --git a/leader/src/main/scripts/ama-start-mesos.sh b/leader/src/main/scripts/ama-start-mesos.sh
index 6c9139c..18dbed9 100755
--- a/leader/src/main/scripts/ama-start-mesos.sh
+++ b/leader/src/main/scripts/ama-start-mesos.sh
@@ -95,7 +95,7 @@ esac
 done
 
 echo "repo: ${REPO} "
-CMD="java -cp ${BASEDIR}/bin/*-all.jar -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.JobLauncher --home ${BASEDIR}"
+CMD="java -cp ${BASEDIR}/bin/*-all.jar -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.MesosJobLauncher --home ${BASEDIR}"
 
 if [ -n "$REPO" ]; then
     CMD+=" --repo ${REPO}"
@@ -123,13 +123,14 @@ fi
 
 if ! ls ${BASEDIR}/dist/spark*.tgz 1> /dev/null 2>&1; then
     echo "${bold} Fetching spark distributable ${NC}"
-    wget https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
+    #wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
+    wget http://apache.mirror.digitalpacific.com.au/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
 fi
 if [ ! -f ${BASEDIR}/dist/Miniconda2-latest-Linux-x86_64.sh ]; then
     echo "${bold}Fetching miniconda distributable ${NC}"
     wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -P ${BASEDIR}/dist
 fi
-
+cp ${BASEDIR}/amaterasu.properties ${BASEDIR}/dist
 eval $CMD | grep "===>"
 
 echo ""
diff --git a/leader/src/main/scripts/amaterasu.properties b/leader/src/main/scripts/amaterasu.properties
index 57c6006..7961db9 100755
--- a/leader/src/main/scripts/amaterasu.properties
+++ b/leader/src/main/scripts/amaterasu.properties
@@ -5,7 +5,7 @@ user=root
 mode=yarn
 webserver.port=8000
 webserver.root=dist
-spark.version=2.1.1-bin-hadoop2.7
+spark.version=2.2.1-bin-hadoop2.7
 yarn.queue=default
 yarn.jarspath=hdfs:///apps/amaterasu
 spark.home=/usr/hdp/current/spark2-client
diff --git a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
index 4034c8e..bd200a0 100644
--- a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
+++ b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
@@ -32,46 +32,47 @@ class HttpServerTests extends FlatSpec with Matchers {
 
   // this is an ugly hack, getClass.getResource("/").getPath should have worked but
   // stopped working when we moved to gradle :(
+  val resources = new File(getClass.getResource("/simple-maki.yml").getPath).getParent
 
-
-//  "Jetty Web server" should "start HTTP server, serve content and stop successfully" in {
-//    val resources = new File(getClass.getResource("/simple-maki.yml").getPath).getParent
-//    var data = ""
-//    try {
-//      HttpServer.start("8000", resources)
-//      val html = Source.fromURL("http://localhost:8000/jetty-test-data.txt")
-//      data = html.mkString
-//    }
-//    finally {
-//      HttpServer.stop()
-//    }
-//    data should equal("This is a test file to download from Jetty webserver")
-//  }
+  //  "Jetty Web server" should "start HTTP server, serve content and stop successfully" in {
+  //
+  //    var data = ""
+  //    try {
+  //      HttpServer.start("8000", resources)
+  //      val html = Source.fromURL("http://localhost:8000/jetty-test-data.txt")
+  //      data = html.mkString
+  //    }
+  //    finally {
+  //      HttpServer.stop()
+  //    }
+  //    data should equal("This is a test file to download from Jetty webserver")
+  //  }
 
   "Jetty File server with '/' as root" should "start HTTP server, serve content and stop successfully" in {
-    var data = ""
-    val resources = new File(getClass.getResource("/dist").getPath).getParent
-    var urlCount:Int = 0
-    println("resource location"+resources)
+
+    var urlCount: Int = 0
+    println("resource location" + resources)
     try {
-      HttpServer.start("8000",resources)
-      val urls = HttpServer.getFilesInDirectory("localhost","8000","dist")
+      HttpServer.start("8000", resources)
+      val urls = HttpServer.getFilesInDirectory("127.0.0.1", "8000", "dist")
       urls.foreach(println)
       urlCount = urls.length
+    } catch {
+      case e: Exception => println(s"++++>> ${e.getMessage}")
     }
     finally {
       HttpServer.stop()
     }
     urlCount should equal(2)
   }
+
   "Jetty File server with 'dist' as root" should "start HTTP server, serve content and stop successfully" in {
     var data = ""
-    val resources = new File(getClass.getResource("/dist").getPath).getParent
-    var urlCount:Int = 0
-    println("resource location"+resources)
+    var urlCount: Int = 0
+    println("resource location" + resources)
     try {
-      HttpServer.start("8000",resources+"/dist")
-      val urls = HttpServer.getFilesInDirectory("localhost","8000","")
+      HttpServer.start("8000", resources + "/dist")
+      val urls = HttpServer.getFilesInDirectory("localhost", "8000", "")
       urls.foreach(println)
       urlCount = urls.length
     }
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java
index 01fe266..1c6ed8c 100644
--- a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.amaterasu.sdk;
 
+import org.apache.amaterasu.common.configuration.ClusterConfig;
 import org.apache.amaterasu.common.dataobjects.ExecData;
 import org.apache.amaterasu.common.execution.actions.Notifier;
 
@@ -33,7 +34,7 @@ void init(ExecData data,
               ByteArrayOutputStream outStream,
               Notifier notifier,
               String executorId,
-              String propFile,
+              ClusterConfig config,
               String hostName);
 
     String getGroupIdentifier();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services