You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@amaterasu.apache.org by GitBox <gi...@apache.org> on 2018/06/28 04:30:12 UTC

[GitHub] roadan closed pull request #24: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own pro…

roadan closed pull request #24: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own pro…
URL: https://github.com/apache/incubator-amaterasu/pull/24
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 79b926f..8b56e00 100644
--- a/build.gradle
+++ b/build.gradle
@@ -25,10 +25,6 @@ allprojects {
     version '0.2.0-incubating-rc3'
 }
 
-project(':leader')
-project(':common')
-project(':executor')
-
 task copyLeagalFiles(type: Copy) {
     from "./DISCLAIMER", "./LICENSE", "./NOTICE"
     into "${buildDir}/amaterasu"
diff --git a/executor/build.gradle b/executor/build.gradle
index 21bc2b0..09e269c 100644
--- a/executor/build.gradle
+++ b/executor/build.gradle
@@ -54,7 +54,6 @@ dependencies {
 
     compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
     compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
-    compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8'
     compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final'
     compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
     compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
@@ -75,18 +74,7 @@ dependencies {
     compile project(':common')
     compile project(':amaterasu-sdk')
 
-    //runtime dependency for spark
-    provided('org.apache.spark:spark-repl_2.11:2.2.1')
-    provided('org.apache.spark:spark-core_2.11:2.2.1')
-
-    testCompile project(':common')
-    testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
-    testRuntime 'org.pegdown:pegdown:1.1.0'
-    testCompile 'junit:junit:4.11'
-    testCompile 'org.scalatest:scalatest_2.11:3.0.2'
-    testCompile 'org.scala-lang:scala-library:2.11.8'
-    testCompile('org.apache.spark:spark-repl_2.11:2.2.1')
-    testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
+
 
 }
 
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
deleted file mode 100755
index 79fe18a..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-
-import java.io.File
-import java.util
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.SparkEnv
-import org.apache.spark.sql.SparkSession
-
-import scala.sys.process.{Process, ProcessLogger}
-
-
-
-
-class PySparkRunner extends AmaterasuRunner with Logging {
-
-  var proc: Process = _
-  var notifier: Notifier = _
-
-  override def getIdentifier: String = "pyspark"
-
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-    interpretSources(actionSource, actionName, exports)
-  }
-
-  def interpretSources(source: String, actionName: String, exports: util.Map[String, String]): Unit = {
-
-    PySparkEntryPoint.getExecutionQueue.setForExec((source, actionName, exports))
-    val resQueue = PySparkEntryPoint.getResultQueue(actionName)
-
-    notifier.info(s"================= started action $actionName =================")
-
-    var res: PySparkResult = null
-
-    do {
-      res = resQueue.getNext()
-      res.resultType match {
-        case ResultType.success =>
-          notifier.success(res.statement)
-        case ResultType.error =>
-          notifier.error(res.statement, res.message)
-          throw new Exception(res.message)
-        case ResultType.completion =>
-          notifier.info(s"================= finished action $actionName =================")
-      }
-    } while (res != null && res.resultType != ResultType.completion)
-  }
-
-}
-
-object PySparkRunner {
-
-  def collectCondaPackages(): String = {
-    val pkgsDirs = new File("./miniconda/pkgs")
-    (pkgsDirs.listFiles.filter {
-      file => file.getName.endsWith(".tar.bz2")
-    }.map {
-      file => s"./miniconda/pkgs/${file.getName}"
-    }.toBuffer ++ "dist/codegen.py").mkString(",")
-  }
-
-  def apply(env: Environment,
-            jobId: String,
-            notifier: Notifier,
-            spark: SparkSession,
-            pypath: String,
-            pyDeps: PythonDependencies,
-            config: ClusterConfig): PySparkRunner = {
-
-    val shellLoger = ProcessLogger(
-      (o: String) => println(o),
-      (e: String) => println(e)
-    )
-
-    //TODO: can we make this less ugly?
-
-
-    val result = new PySparkRunner
-
-    PySparkEntryPoint.start(spark, jobId, env, SparkEnv.get)
-    val port = PySparkEntryPoint.getPort
-    var intpPath = ""
-    if (env.configuration.contains("cwd")) {
-      val cwd = new File(env.configuration("cwd"))
-      intpPath = s"${cwd.getAbsolutePath}/spark_intp.py" // This is to support test environment
-    } else {
-      intpPath = s"spark_intp.py"
-    }
-    var pysparkPath = ""
-    var condaPkgs = ""
-    if (pyDeps != null)
-      condaPkgs = collectCondaPackages()
-    var sparkCmd: Seq[String] = Seq()
-    config.mode match {
-      case "yarn" =>
-        pysparkPath = s"spark/bin/spark-submit"
-        sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
-        val proc = Process(sparkCmd, None,
-          "PYTHONPATH" -> pypath,
-          "PYTHONHASHSEED" -> 0.toString)
-
-        proc.run(shellLoger)
-      case "mesos" =>
-        pysparkPath = config.pysparkPath
-        if (pysparkPath.endsWith("spark-submit")) {
-          sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString)
-        }
-        else {
-          sparkCmd = Seq(pysparkPath, intpPath, port.toString)
-        }
-        var pysparkPython = "/usr/bin/python"
-
-        if (pyDeps != null &&
-          pyDeps.packages.nonEmpty) {
-          pysparkPython = "./miniconda/bin/python"
-        }
-        val proc = Process(sparkCmd, None,
-          "PYTHONPATH" -> pypath,
-          "PYSPARK_PYTHON" -> pysparkPython,
-        "PYTHONHASHSEED" -> 0.toString)
-
-        proc.run(shellLoger)
-    }
-
-    result.notifier = notifier
-
-    result
-  }
-
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
index 9ab75be..0acd1fe 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
@@ -26,7 +26,6 @@ import org.apache.amaterasu.executor.common.executors.ProvidersFactory
 import org.apache.mesos.Protos._
 import org.apache.mesos.protobuf.ByteString
 import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver}
-import org.apache.spark.SparkContext
 
 import scala.collection.JavaConverters._
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -37,7 +36,6 @@ class MesosActionsExecutor extends Executor with Logging {
 
   var master: String = _
   var executorDriver: ExecutorDriver = _
-  var sc: SparkContext = _
   var jobId: String = _
   var actionName: String = _
   //  var sparkScalaRunner: SparkScalaRunner = _
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala b/executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala
deleted file mode 100755
index a61cd5a..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.runtime
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.spark.SparkContext
-import org.apache.spark.sql._
-
-object AmaContext extends Logging {
-
-  var spark: SparkSession = _
-  var sc: SparkContext = _
-  var jobId: String = _
-  var env: Environment = _
-
-  def init(spark: SparkSession,
-           jobId: String,
-           env: Environment): Unit = {
-
-    AmaContext.spark = spark
-    AmaContext.sc = spark.sparkContext
-    AmaContext.jobId = jobId
-    AmaContext.env = env
-
-  }
-
-  def getDataFrame(actionName: String, dfName: String, format: String = "parquet"): DataFrame = {
-
-    spark.read.format(format).load(s"${env.workingDir}/$jobId/$actionName/$dfName")
-
-  }
-
-  def getDataset[T: Encoder](actionName: String, dfName: String, format: String = "parquet"): Dataset[T] = {
-
-    getDataFrame(actionName, dfName, format).as[T]
-
-  }
-
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index f4f553c..b5f8700 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -19,24 +19,18 @@ package org.apache.amaterasu.executor.yarn.executors
 import java.io.ByteArrayOutputStream
 import java.net.{InetAddress, URLDecoder}
 
-import scala.collection.JavaConverters._
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.executor.common.executors.{ActiveNotifier, ProvidersFactory}
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.spark.SparkContext
 
-import scala.reflect.internal.util.ScalaClassLoader
-import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader
+import scala.collection.JavaConverters._
 
 
 class ActionsExecutor extends Logging {
 
   var master: String = _
-  var sc: SparkContext = _
   var jobId: String = _
   var actionName: String = _
   var taskData: TaskData = _
diff --git a/frameworks/spark/runner/build.gradle b/frameworks/spark/runner/build.gradle
new file mode 100644
index 0000000..65d9bb4
--- /dev/null
+++ b/frameworks/spark/runner/build.gradle
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'com.github.johnrengelman.shadow' version '1.2.4'
+    id 'com.github.maiflai.scalatest' version '0.6-5-g9065d91'
+    id 'scala'
+    id 'java'
+}
+
+shadowJar {
+    zip64 true
+}
+
+repositories {
+    maven {
+        url "https://plugins.gradle.org/m2/"
+    }
+    mavenCentral()
+}
+
+test {
+    maxParallelForks = 1
+    forkEvery = 1
+}
+
+configurations {
+    provided
+    runtime.exclude module: 'hadoop-common'
+    runtime.exclude module: 'hadoop-yarn-api'
+    runtime.exclude module: 'hadoop-yarn-client'
+    runtime.exclude module: 'hadoop-hdfs'
+    runtime.exclude module: 'mesos'
+    runtime.exclude module: 'scala-compiler'
+}
+
+sourceSets {
+    main.compileClasspath += configurations.provided
+    test.compileClasspath += configurations.provided
+    test.runtimeClasspath += configurations.provided
+}
+
+dependencies {
+
+    compile project(':executor')
+    compile project(':spark-runtime')
+    provided('org.apache.spark:spark-repl_2.11:2.2.1')
+    provided('org.apache.spark:spark-core_2.11:2.2.1')
+
+    testCompile project(':common')
+    testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
+    testRuntime 'org.pegdown:pegdown:1.1.0'
+    testCompile 'junit:junit:4.11'
+    testCompile 'org.scalatest:scalatest_2.11:3.0.2'
+    testCompile('org.apache.spark:spark-repl_2.11:2.2.1')
+    testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
+
+}
+
+sourceSets {
+    test {
+        resources.srcDirs += [file('src/test/resources')]
+    }
+
+    main {
+        scala {
+            srcDirs = ['src/main/scala', 'src/main/java']
+        }
+        java {
+            srcDirs = []
+        }
+    }
+}
+
+test {
+
+    maxParallelForks = 1
+}
+
+task copyToHome(type: Copy) {
+    dependsOn shadowJar
+    from 'build/libs'
+    into '../../../build/amaterasu/dist'
+    from 'build/resources/main'
+    into '../../../build/amaterasu/dist'
+}
diff --git a/executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java b/frameworks/spark/runner/src/main/java/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkEntryPoint.java
similarity index 92%
rename from executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java
rename to frameworks/spark/runner/src/main/java/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkEntryPoint.java
index a521fce..4d98b79 100755
--- a/executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java
+++ b/frameworks/spark/runner/src/main/java/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkEntryPoint.java
@@ -14,17 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark;
+package org.apache.amaterasu.executor.runner.spark.pyspark;
 
-import org.apache.amaterasu.executor.runtime.AmaContext;
 import org.apache.amaterasu.common.runtime.Environment;
-
+import org.apache.amaterasu.executor.runtime.spark.AmaContext;
+import org.apache.spark.SparkConf;
 import org.apache.spark.SparkEnv;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-
 import org.apache.spark.sql.SparkSession;
 import py4j.GatewayServer;
 
@@ -35,7 +32,7 @@
 public class PySparkEntryPoint {
 
     //private static Boolean started = false;
-    private static PySparkExecutionQueue queue = new PySparkExecutionQueue();
+    private static  PySparkExecutionQueue queue = new PySparkExecutionQueue();
     private static ConcurrentHashMap<String, ResultQueue> resultQueues = new ConcurrentHashMap<>();
 
     private static int port = 0;
diff --git a/executor/src/main/resources/codegen.py b/frameworks/spark/runner/src/main/resources/codegen.py
similarity index 100%
rename from executor/src/main/resources/codegen.py
rename to frameworks/spark/runner/src/main/resources/codegen.py
diff --git a/executor/src/main/resources/runtime.py b/frameworks/spark/runner/src/main/resources/runtime.py
similarity index 100%
rename from executor/src/main/resources/runtime.py
rename to frameworks/spark/runner/src/main/resources/runtime.py
diff --git a/executor/src/main/resources/spark-version-info.properties b/frameworks/spark/runner/src/main/resources/spark-version-info.properties
similarity index 100%
rename from executor/src/main/resources/spark-version-info.properties
rename to frameworks/spark/runner/src/main/resources/spark-version-info.properties
diff --git a/executor/src/main/resources/spark_intp.py b/frameworks/spark/runner/src/main/resources/spark_intp.py
similarity index 100%
rename from executor/src/main/resources/spark_intp.py
rename to frameworks/spark/runner/src/main/resources/spark_intp.py
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/SparkRunnersProvider.scala
similarity index 95%
rename from executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
rename to frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/SparkRunnersProvider.scala
index ba7ff03..3b07ec5 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/SparkRunnersProvider.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.executor.execution.actions.runners.spark
+package org.apache.amaterasu.executor.runner.spark
 
 import java.io._
 
@@ -24,10 +24,10 @@ import org.apache.amaterasu.common.dataobjects.ExecData
 import org.apache.amaterasu.common.execution.actions.Notifier
 import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.PySparkRunner
-import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner
+import org.apache.amaterasu.executor.runner.spark.pyspark.PySparkRunner
+import org.apache.amaterasu.executor.runner.spark.repl.{SparkRunnerHelper, SparkScalaRunner}
+import org.apache.amaterasu.executor.runner.spark.sparksql.SparkSqlRunner
 import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
-import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner}
 import org.eclipse.aether.util.artifact.JavaScopes
 import org.sonatype.aether.repository.RemoteRepository
 import org.sonatype.aether.util.artifact.DefaultArtifact
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkExecutionQueue.scala
similarity index 94%
rename from executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala
rename to frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkExecutionQueue.scala
index 411069a..10d1ce9 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkExecutionQueue.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
+package org.apache.amaterasu.executor.runner.spark.pyspark
 
 import java.util
 import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkResultQueue.scala
similarity index 85%
rename from executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala
rename to frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkResultQueue.scala
index 6dbd445..0348979 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkResultQueue.scala
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
+package org.apache.amaterasu.executor.runner.spark.pyspark
 
-import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.ResultType.ResultType
+import org.apache.amaterasu.executor.runner.spark.pyspark.ResultType.ResultType
 
 object ResultType extends Enumeration {
   type ResultType = Value
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkRunner.scala
new file mode 100644
index 0000000..7089a0f
--- /dev/null
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/PySparkRunner.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.executor.runner.spark.pyspark
+
+import java.io.{File, PrintWriter, StringWriter}
+import java.util
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.execution.actions.Notifier
+import org.apache.amaterasu.common.execution.dependencies.{PythonDependencies, PythonPackage}
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.sdk.AmaterasuRunner
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.SparkSession
+
+import scala.sys.process.Process
+
+
+class PySparkRunner extends AmaterasuRunner with Logging {
+
+  var proc: Process = _
+  var notifier: Notifier = _
+
+  override def getIdentifier: String = "pyspark"
+
+  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
+    interpretSources(actionSource, actionName, exports)
+  }
+
+  def interpretSources(source: String, actionName: String, exports: util.Map[String, String]): Unit = {
+
+    PySparkEntryPoint.getExecutionQueue.setForExec((source, actionName, exports))
+    val resQueue = PySparkEntryPoint.getResultQueue(actionName)
+
+    notifier.info(s"================= started action $actionName =================")
+
+    var res: PySparkResult = null
+
+    do {
+      res = resQueue.getNext()
+      res.resultType match {
+        case ResultType.success =>
+          notifier.success(res.statement)
+        case ResultType.error =>
+          notifier.error(res.statement, res.message)
+          throw new Exception(res.message)
+        case ResultType.completion =>
+          notifier.info(s"================= finished action $actionName =================")
+      }
+    } while (res != null && res.resultType != ResultType.completion)
+  }
+
+}
+
+object PySparkRunner {
+
+  def apply(env: Environment,
+            jobId: String,
+            notifier: Notifier,
+            spark: SparkSession,
+            pypath: String,
+            pyDeps: PythonDependencies,
+            config: ClusterConfig): PySparkRunner = {
+
+    //TODO: can we make this less ugly?
+    var pysparkPython = "/usr/bin/python"
+
+    if (pyDeps != null &&
+      pyDeps.packages.nonEmpty) {
+      loadPythonDependencies(pyDeps, notifier)
+      pysparkPython = "miniconda/bin/python"
+    }
+
+    val result = new PySparkRunner
+
+    PySparkEntryPoint.start(spark, jobId, env, SparkEnv.get)
+    val port = PySparkEntryPoint.getPort
+    var intpPath = ""
+    if (env.configuration.contains("cwd")) {
+      val cwd = new File(env.configuration("cwd"))
+      intpPath = s"${cwd.getAbsolutePath}/spark_intp.py" // This is to support test environment
+    } else {
+      intpPath = s"spark_intp.py"
+    }
+    var pysparkPath = ""
+    if (env.configuration.contains("pysparkPath")) {
+      pysparkPath = env.configuration("pysparkPath")
+    } else {
+      pysparkPath = s"${config.spark.home}/bin/spark-submit"
+    }
+    val proc = Process(Seq(pysparkPath, intpPath, port.toString), None,
+      "PYTHONPATH" -> pypath,
+      "PYSPARK_PYTHON" -> pysparkPython,
+      "PYTHONHASHSEED" -> 0.toString) #> System.out
+
+    proc.run()
+
+
+    result.notifier = notifier
+
+    result
+  }
+
+  /**
+    * This installs the required python dependencies.
+    * We basically need 2 packages to make pyspark work with customer's scripts:
+    * 1. py4j - supplied by spark, for communication between Python and Java runtimes.
+    * 2. codegen - for dynamically parsing and converting customer's scripts into executable Python code objects.
+    * Currently we only know how to install packages using Anaconda, the reason is 3rd party OS libraries, e.g. libevent
+    * Anaconda has the capabilities to automatically resolve the required OS libraries per Python package and install them.
+    *
+    * TODO - figure out if we really want to support pip directly, or if Anaconda is enough.
+    * @param deps All of the customer's supplied Python dependencies, this currently comes from job-repo/deps/python.yml
+    * @param notifier
+    */
+  private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
+    notifier.info("loading anaconda evn")
+    installAnacondaOnNode()
+    val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
+    installAnacondaPackage(codegenPackage)
+    try {
+      deps.packages.foreach(pack => {
+        pack.index.getOrElse("anaconda").toLowerCase match {
+          case "anaconda" => installAnacondaPackage(pack)
+          // case "pypi" => installPyPiPackage(pack) TODO: See if we can support this
+        }
+      })
+    }
+    catch {
+
+      case rte: RuntimeException =>
+        val sw = new StringWriter
+        rte.printStackTrace(new PrintWriter(sw))
+        notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
+      case e: Exception =>
+        val sw = new StringWriter
+        e.printStackTrace(new PrintWriter(sw))
+        notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
+    }
+  }
+
+
+  /**
+    * Installs one python package using Anaconda.
+    * Anaconda works with multiple channels, or better called, repositories.
+    * Normally, if a channel isn't specified, Anaconda will fetch the package from the default conda channel.
+    * The reason we need to use channels, is that sometimes the required package doesn't exist on the default channel.
+    * @param pythonPackage This comes from parsing the python.yml dep file.
+    */
+  private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
+    val channel = pythonPackage.channel.getOrElse("anaconda")
+    if (channel == "anaconda") {
+      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}")
+    } else {
+      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}")
+    }
+  }
+
+  /**
+    * Installs Anaconda and then links it with the local spark that was installed on the executor.
+    */
+  private def installAnacondaOnNode(): Unit = {
+    Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda")
+    Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build")
+    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark")
+  }
+
+
+}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/ResultQueue.scala
similarity index 94%
rename from executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala
rename to frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/ResultQueue.scala
index 3ac7bd7..9a5763d 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/pyspark/ResultQueue.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
+package org.apache.amaterasu.executor.runner.spark.pyspark
 
 import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
 
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/repl/AmaSparkILoop.scala
similarity index 95%
rename from executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala
rename to frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/repl/AmaSparkILoop.scala
index 19ef3de..a4ef5f4 100755
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/repl/AmaSparkILoop.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.repl.amaterasu
+package org.apache.amaterasu.executor.runner.spark.repl
 
 import java.io.PrintWriter
 
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/repl/SparkRunnerHelper.scala
similarity index 93%
rename from executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
rename to frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/repl/SparkRunnerHelper.scala
index f2c2afa..5b1c64a 100644
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/repl/SparkRunnerHelper.scala
@@ -14,19 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.repl.amaterasu.runners.spark
+package org.apache.amaterasu.executor.runner.spark.repl
 
 import java.io.{ByteArrayOutputStream, File, PrintWriter}
+import java.nio.file.{Files, Paths}
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.execution.actions.Notifier
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
 import org.apache.amaterasu.common.utils.FileUtils
-import org.apache.spark.repl.amaterasu.AmaSparkILoop
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.Utils
-import org.apache.spark.SparkConf
 
 import scala.tools.nsc.GenericRunnerSettings
 import scala.tools.nsc.interpreter.IMain
@@ -34,8 +34,9 @@ import scala.tools.nsc.interpreter.IMain
 object SparkRunnerHelper extends Logging {
 
   private val conf = new SparkConf()
-  private val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
-  private val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
+  private val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
+  private val outputDir = Files.createTempDirectory(Paths.get(rootDir), "repl").toFile
+  outputDir.deleteOnExit()
 
   private var sparkSession: SparkSession = _
 
@@ -145,16 +146,16 @@ object SparkRunnerHelper extends Logging {
       case "yarn" =>
         conf.set("spark.home", config.spark.home)
           // TODO: parameterize those
-          .setJars(s"executor.jar" +: jars)
+          .setJars(Seq("executor.jar", "spark-runner.jar", "spark-runtime.jar") ++ jars)
           .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab")
           .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64")
           .set("spark.yarn.queue", "default")
           .set("spark.history.kerberos.principal", "none")
 
           .set("spark.master", master)
-          .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
+          .set("spark.executor.instances", "1") // TODO: change this
           .set("spark.yarn.jars", s"spark/jars/*")
-          .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
+          .set("spark.executor.memory", "1g")
           .set("spark.dynamicAllocation.enabled", "false")
           .set("spark.eventLog.enabled", "false")
           .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/repl/SparkScalaRunner.scala
similarity index 98%
rename from executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
rename to frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/repl/SparkScalaRunner.scala
index a45b8c0..f9728e5 100755
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/repl/SparkScalaRunner.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.repl.amaterasu.runners.spark
+package org.apache.amaterasu.executor.runner.spark.repl
 
 import java.io.ByteArrayOutputStream
 import java.util
@@ -22,7 +22,7 @@ import java.util
 import org.apache.amaterasu.common.execution.actions.Notifier
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.runtime.AmaContext
+import org.apache.amaterasu.executor.runtime.spark.AmaContext
 import org.apache.amaterasu.sdk.AmaterasuRunner
 import org.apache.spark.sql.{Dataset, SparkSession}
 
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/sparkr/SparkRRunner.scala
similarity index 69%
rename from executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala
rename to frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/sparkr/SparkRRunner.scala
index d111cfb..c1573d6 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/sparkr/SparkRRunner.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.executor.execution.actions.runners.spark
+package org.apache.amaterasu.executor.runner.spark.sparkr
 
 import java.io.ByteArrayOutputStream
 import java.util
@@ -28,21 +28,21 @@ import org.apache.spark.SparkContext
 
 class SparkRRunner extends Logging with AmaterasuRunner {
 
-  override def getIdentifier = "spark-r"
+    override def getIdentifier = "spark-r"
 
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-  }
+    override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
+    }
 }
 
 object SparkRRunner {
-  def apply(
-    env: Environment,
-    jobId: String,
-    sparkContext: SparkContext,
-    outStream: ByteArrayOutputStream,
-    notifier: Notifier,
-    jars: Seq[String]
-  ): SparkRRunner = {
-    new SparkRRunner()
-  }
+    def apply(
+               env: Environment,
+               jobId: String,
+               sparkContext: SparkContext,
+               outStream: ByteArrayOutputStream,
+               notifier: Notifier,
+               jars: Seq[String]
+             ): SparkRRunner = {
+        new SparkRRunner()
+    }
 }
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/sparksql/SparkSqlRunner.scala
similarity index 96%
rename from executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
rename to frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/sparksql/SparkSqlRunner.scala
index 350ddb4..27164cc 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/sparksql/SparkSqlRunner.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql
+package org.apache.amaterasu.executor.runner.spark.sparksql
 
 import java.io.File
 import java.util
@@ -22,10 +22,11 @@ import java.util
 import org.apache.amaterasu.common.execution.actions.Notifier
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.runtime.AmaContext
+import org.apache.amaterasu.executor.runtime.spark.AmaContext
 import org.apache.amaterasu.sdk.AmaterasuRunner
 import org.apache.commons.io.FilenameUtils
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
 import scala.collection.JavaConverters._
 
 /**
@@ -101,8 +102,8 @@ class SparkSqlRunner extends Logging with AmaterasuRunner {
 
         try{
 
-        result = spark.sql(parsedQuery)
-        notifier.success(parsedQuery)
+          result = spark.sql(parsedQuery)
+          notifier.success(parsedQuery)
         } catch {
           case e: Exception => notifier.error(parsedQuery, e.getMessage)
         }
diff --git a/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv b/frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
similarity index 100%
rename from executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
rename to frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
diff --git a/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json b/frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json
similarity index 100%
rename from executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
rename to frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json
diff --git a/executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
similarity index 100%
rename from executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
rename to frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
diff --git a/executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
similarity index 100%
rename from executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
rename to frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
diff --git a/executor/src/test/resources/SparkSql/parquet/_SUCCESS b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS
similarity index 100%
rename from executor/src/test/resources/SparkSql/parquet/_SUCCESS
rename to frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS
diff --git a/executor/src/test/resources/SparkSql/parquet/_common_metadata b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata
similarity index 100%
rename from executor/src/test/resources/SparkSql/parquet/_common_metadata
rename to frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata
diff --git a/executor/src/test/resources/SparkSql/parquet/_metadata b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata
similarity index 100%
rename from executor/src/test/resources/SparkSql/parquet/_metadata
rename to frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata
diff --git a/executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
similarity index 100%
rename from executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
rename to frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
diff --git a/executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
similarity index 100%
rename from executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
rename to frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
diff --git a/executor/src/test/resources/amaterasu.properties b/frameworks/spark/runner/src/test/resources/amaterasu.properties
similarity index 100%
rename from executor/src/test/resources/amaterasu.properties
rename to frameworks/spark/runner/src/test/resources/amaterasu.properties
diff --git a/executor/src/test/resources/codegen.py b/frameworks/spark/runner/src/test/resources/codegen.py
similarity index 100%
rename from executor/src/test/resources/codegen.py
rename to frameworks/spark/runner/src/test/resources/codegen.py
diff --git a/executor/src/test/resources/py4j-0.10.4-src.zip b/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip
similarity index 100%
rename from executor/src/test/resources/py4j-0.10.4-src.zip
rename to frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip
diff --git a/executor/src/test/resources/py4j.tar.gz b/frameworks/spark/runner/src/test/resources/py4j.tar.gz
similarity index 100%
rename from executor/src/test/resources/py4j.tar.gz
rename to frameworks/spark/runner/src/test/resources/py4j.tar.gz
diff --git a/executor/src/test/resources/pyspark-with-amacontext.py b/frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py
similarity index 100%
rename from executor/src/test/resources/pyspark-with-amacontext.py
rename to frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py
diff --git a/executor/src/test/resources/pyspark.tar.gz b/frameworks/spark/runner/src/test/resources/pyspark.tar.gz
similarity index 100%
rename from executor/src/test/resources/pyspark.tar.gz
rename to frameworks/spark/runner/src/test/resources/pyspark.tar.gz
diff --git a/executor/src/test/resources/pyspark.zip b/frameworks/spark/runner/src/test/resources/pyspark.zip
similarity index 100%
rename from executor/src/test/resources/pyspark.zip
rename to frameworks/spark/runner/src/test/resources/pyspark.zip
diff --git a/executor/src/test/resources/runtime.py b/frameworks/spark/runner/src/test/resources/runtime.py
similarity index 100%
rename from executor/src/test/resources/runtime.py
rename to frameworks/spark/runner/src/test/resources/runtime.py
diff --git a/executor/src/test/resources/simple-pyspark.py b/frameworks/spark/runner/src/test/resources/simple-pyspark.py
similarity index 100%
rename from executor/src/test/resources/simple-pyspark.py
rename to frameworks/spark/runner/src/test/resources/simple-pyspark.py
diff --git a/executor/src/test/resources/simple-python-err.py b/frameworks/spark/runner/src/test/resources/simple-python-err.py
similarity index 100%
rename from executor/src/test/resources/simple-python-err.py
rename to frameworks/spark/runner/src/test/resources/simple-python-err.py
diff --git a/executor/src/test/resources/simple-python.py b/frameworks/spark/runner/src/test/resources/simple-python.py
similarity index 100%
rename from executor/src/test/resources/simple-python.py
rename to frameworks/spark/runner/src/test/resources/simple-python.py
diff --git a/executor/src/test/resources/simple-spark.scala b/frameworks/spark/runner/src/test/resources/simple-spark.scala
similarity index 100%
rename from executor/src/test/resources/simple-spark.scala
rename to frameworks/spark/runner/src/test/resources/simple-spark.scala
diff --git a/executor/src/test/resources/spark_intp.py b/frameworks/spark/runner/src/test/resources/spark_intp.py
similarity index 100%
rename from executor/src/test/resources/spark_intp.py
rename to frameworks/spark/runner/src/test/resources/spark_intp.py
diff --git a/executor/src/test/resources/step-2.scala b/frameworks/spark/runner/src/test/resources/step-2.scala
similarity index 100%
rename from executor/src/test/resources/step-2.scala
rename to frameworks/spark/runner/src/test/resources/step-2.scala
diff --git a/executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala
similarity index 100%
rename from executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala
rename to frameworks/spark/runner/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
similarity index 96%
rename from executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
rename to frameworks/spark/runner/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
index b11a4f9..eaaed54 100644
--- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
+++ b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
@@ -52,7 +52,7 @@ class SparkTestsSuite extends Suites(
   override def beforeAll(): Unit = {
 
     // I can't apologise enough for this
-    val resources = new File(getClass.getResource("/spark_intp.py").getPath).getParent
+    val resources = new File(getClass.getResource("/frameworks/spark/runner/src/test/resources/spark_intp.py").getPath).getParent
     val workDir = new File(resources).getParentFile.getParent
 
     env = Environment()
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/spark/pyspark/PySparkRunnerTests.scala
similarity index 100%
rename from executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala
rename to frameworks/spark/runner/src/test/scala/org/apache/amaterasu/spark/pyspark/PySparkRunnerTests.scala
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/spark/repl/SparkScalaRunnerTests.scala
similarity index 100%
rename from executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
rename to frameworks/spark/runner/src/test/scala/org/apache/amaterasu/spark/repl/SparkScalaRunnerTests.scala
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/spark/sparksql/SparkSqlRunnerTests.scala
similarity index 100%
rename from executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
rename to frameworks/spark/runner/src/test/scala/org/apache/amaterasu/spark/sparksql/SparkSqlRunnerTests.scala
diff --git a/executor/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
similarity index 100%
rename from executor/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
rename to frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
diff --git a/frameworks/spark/runtime/build.gradle b/frameworks/spark/runtime/build.gradle
new file mode 100644
index 0000000..9bba2e4
--- /dev/null
+++ b/frameworks/spark/runtime/build.gradle
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'com.github.johnrengelman.shadow' version '1.2.4'
+    id 'com.github.maiflai.scalatest' version '0.6-5-g9065d91'
+    id 'scala'
+    id 'java'
+}
+
+shadowJar {
+    zip64 true
+}
+
+repositories {
+    maven {
+        url "https://plugins.gradle.org/m2/"
+    }
+    mavenCentral()
+}
+
+test {
+    maxParallelForks = 1
+    forkEvery = 1
+}
+
+configurations {
+    provided
+    runtime.exclude module: 'hadoop-common'
+    runtime.exclude module: 'hadoop-yarn-api'
+    runtime.exclude module: 'hadoop-yarn-client'
+    runtime.exclude module: 'hadoop-hdfs'
+    runtime.exclude module: 'mesos'
+    runtime.exclude module: 'scala-compiler'
+}
+
+sourceSets {
+    main.compileClasspath += configurations.provided
+    test.compileClasspath += configurations.provided
+    test.runtimeClasspath += configurations.provided
+}
+
+dependencies {
+
+    compile project(':executor')
+    provided('org.apache.spark:spark-repl_2.11:2.2.1')
+    provided('org.apache.spark:spark-core_2.11:2.2.1')
+
+}
+
+sourceSets {
+    test {
+        resources.srcDirs += [file('src/test/resources')]
+    }
+
+    main {
+        scala {
+            srcDirs = ['src/main/scala', 'src/main/java']
+        }
+        java {
+            srcDirs = []
+        }
+    }
+}
+
+test {
+
+    maxParallelForks = 1
+}
+
+task copyToHome(type: Copy) {
+    from 'build/libs'
+    into '../../../build/amaterasu/dist'
+    from 'build/resources/main'
+    into '../../../build/amaterasu/dist'
+}
diff --git a/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/executor/runtime/spark/AmaContext.scala b/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/executor/runtime/spark/AmaContext.scala
new file mode 100644
index 0000000..babd1f7
--- /dev/null
+++ b/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/executor/runtime/spark/AmaContext.scala
@@ -0,0 +1,37 @@
+package org.apache.amaterasu.executor.runtime.spark
+
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SparkSession}
+
+/**
+  * @author Arun Manivannan
+  */
+object AmaContext extends Logging {
+
+  var spark: SparkSession = _
+  var sc: SparkContext = _
+  var jobId: String = _
+  var env: Environment = _
+
+  def init(spark: SparkSession,
+           jobId: String,
+           env: Environment): Unit = {
+
+    AmaContext.spark = spark
+    AmaContext.sc = spark.sparkContext
+    AmaContext.jobId = jobId
+    AmaContext.env = env
+
+  }
+
+  def getDataFrame(actionName: String, dfName: String, format: String = "parquet"): DataFrame = {
+    spark.read.format(format).load(s"${env.workingDir}/$jobId/$actionName/$dfName")
+  }
+
+  def getDataset[T: Encoder](actionName: String, dfName: String, format: String = "parquet"): Dataset[T] = {
+    getDataFrame(actionName, dfName, format).as[T]
+  }
+
+}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index 1828100..bfa00b7 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -53,6 +53,7 @@ import scala.collection.{concurrent, mutable}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 import scala.util.{Failure, Success}
+import scala.collection.JavaConverters._
 
 class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
 
@@ -69,13 +70,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
   private var conf: YarnConfiguration = _
   private var propPath: String = ""
   private var props: InputStream = _
-  private var jarPath: Path = _
-  private var executorPath: Path = _
-  private var executorJar: LocalResource = _
-  private var propFile: LocalResource = _
-  private var log4jPropFile: LocalResource = _
   private var nmClient: NMClientAsync = _
-  private var allocListener: YarnRMCallbackHandler = _
   private var rmClient: AMRMClientAsync[ContainerRequest] = _
   private var address: String = _
 
@@ -133,15 +128,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
 
     log.info(s"Job ${jobManager.jobId} initiated with ${jobManager.registeredActions.size} actions")
 
-    jarPath = new Path(config.YARN.hdfsJarsPath)
-
-    // TODO: change this to read all dist folder and add to exec path
-    executorPath = Path.mergePaths(jarPath, new Path(s"/dist/executor-${config.version}-all.jar"))
-    log.info("Executor jar path is {}", executorPath)
-    executorJar = setLocalResourceFromPath(executorPath)
-    propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties")))
-    log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")))
-
     log.info("Started execute")
 
     nmClient = new NMClientAsyncImpl(new YarnNMCallbackHandler())
@@ -150,9 +136,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
     nmClient.init(conf)
     nmClient.start()
 
-    // TODO: awsEnv currently set to empty string. should be changed to read values from (where?).
-    allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar)
-
     rmClient = startRMClient()
     val registrationResponse = registerAppMaster("", 0, "")
     val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
@@ -250,10 +233,10 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
         val commands: List[String] = List(
           "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ",
           s"/bin/bash spark/bin/load-spark-env.sh && ",
-          s"java -cp spark/jars/*:executor.jar:spark/conf/:${config.YARN.hadoopHomeDir}/conf/ " +
+          s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${config.YARN.hadoopHomeDir}/conf/ " +
             "-Xmx1G " +
             "-Dscala.usejavacp=true " +
-            "-Dhdp.version=2.6.1.0-129 " +
+            "-Dhdp.version=2.6.5.0-292 " +
             "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
             s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(taskData, "UTF-8")}' '${URLEncoder.encode(execData, "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}' '$address' " +
             s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
@@ -266,22 +249,37 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
         ctx.setCommands(commands)
         ctx.setTokens(allTokens)
 
+        val yarnJarPath = new Path(config.YARN.hdfsJarsPath)
+
+        //TODO Arun - Remove the hardcoding of the dist path
+        /*  val resources = mutable.Map[String, LocalResource]()
+          val binaryFileIter = fs.listFiles(new Path(s"${config.YARN.hdfsJarsPath}/dist"), false)
+          while (binaryFileIter.hasNext) {
+            val eachFile = binaryFileIter.next().getPath
+            resources (eachFile.getName) = setLocalResourceFromPath(fs.makeQualified(eachFile))
+          }
+          resources("log4j.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/log4j.properties")))
+          resources ("amaterasu.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/amaterasu.properties")))*/
+
         val resources = mutable.Map[String, LocalResource](
-          "executor.jar" -> executorJar,
-          "amaterasu.properties" -> propFile,
+          "executor.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/executor-${config.version}-all.jar"))),
+          "spark-runner.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runner-${config.version}-all.jar"))),
+          "spark-runtime.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runtime-${config.version}.jar"))),
+          "amaterasu.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/amaterasu.properties"))),
+          "log4j.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/log4j.properties"))),
           // TODO: Nadav/Eyal all of these should move to the executor resource setup
-          "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/Miniconda2-latest-Linux-x86_64.sh"))),
-          "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/codegen.py"))),
-          "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/runtime.py"))),
-          "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark-version-info.properties"))),
-          "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark_intp.py"))))
+          "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/miniconda.sh"))),
+          "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/codegen.py"))),
+          "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/runtime.py"))),
+          "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))),
+          "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py"))))
 
         val frameworkFactory = FrameworkProvidersFactory(env, config)
         val framework = frameworkFactory.getFramework(actionData.groupId)
 
         //adding the framework and executor resources
-        setupResources(framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
-        setupResources(s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}")
+        setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
+        setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}")
 
         ctx.setLocalResources(resources)
 
@@ -327,9 +325,9 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
     ByteBuffer.wrap(dob.getData, 0, dob.getLength)
   }
 
-  private def setupResources(frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
+  private def setupResources(yarnJarPath: Path, frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
 
-    val sourcePath = Path.mergePaths(jarPath, new Path(s"/$resourcesPath"))
+    val sourcePath = Path.mergePaths(yarnJarPath, new Path(s"/$resourcesPath"))
 
     if (fs.exists(sourcePath)) {
 
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
deleted file mode 100644
index 70da38e..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn
-
-import java.util
-import java.util.Collections
-import java.util.concurrent.ConcurrentHashMap
-
-import com.google.gson.Gson
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.execution.JobManager
-import org.apache.amaterasu.leader.utilities.DataLoader
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
-import org.apache.hadoop.yarn.util.Records
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.concurrent
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-import scala.concurrent._
-import ExecutionContext.Implicits.global
-
-class YarnRMCallbackHandler(nmClient: NMClientAsync,
-                            jobManager: JobManager,
-                            env: String,
-                            awsEnv: String,
-                            config: ClusterConfig,
-                            executorJar: LocalResource) extends AMRMClientAsync.CallbackHandler with Logging {
-
-
-  val gson:Gson = new Gson()
-  private val containersIdsToTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
-  private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
-  private val failedTasksCounter: concurrent.Map[String, Int] = new ConcurrentHashMap[String, Int].asScala
-
-
-  override def onError(e: Throwable): Unit = {
-    println(s"ERROR: ${e.getMessage}")
-  }
-
-  override def onShutdownRequest(): Unit = {
-    println("Shutdown requested")
-  }
-
-  val MAX_ATTEMPTS_PER_TASK = 3
-
-  override def onContainersCompleted(statuses: util.List[ContainerStatus]): Unit = {
-    for (status <- statuses.asScala) {
-      if (status.getState == ContainerState.COMPLETE) {
-        val containerId = status.getContainerId.getContainerId
-        val taskId = containersIdsToTaskIds(containerId)
-        if (status.getExitStatus == 0) {
-          completedContainersAndTaskIds.put(containerId, taskId)
-          log.info(s"Container $containerId completed with task $taskId with success.")
-        } else {
-          log.warn(s"Container $containerId completed with task $taskId with failed status code (${status.getExitStatus}.")
-          val failedTries = failedTasksCounter.getOrElse(taskId, 0)
-          if (failedTries < MAX_ATTEMPTS_PER_TASK) {
-            // TODO: notify and ask for a new container
-            log.info("Retrying task")
-          } else {
-            log.error(s"Already tried task $taskId $MAX_ATTEMPTS_PER_TASK times. Time to say Bye-Bye.")
-            // TODO: die already
-          }
-        }
-      }
-    }
-    if (getProgress == 1F) {
-      log.info("Finished all tasks successfully! Wow!")
-    }
-  }
-
-  override def getProgress: Float = {
-    jobManager.registeredActions.size.toFloat / completedContainersAndTaskIds.size
-  }
-
-  override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = {
-  }
-
-  override def onContainersAllocated(containers: util.List[Container]): Unit = {
-    log.info("containers allocated")
-    for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
-      val containerTask = Future[String] {
-
-        val actionData = jobManager.getNextActionData
-        val taskData = DataLoader.getTaskData(actionData, env)
-        val execData = DataLoader.getExecutorData(env, config)
-
-        val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-        val command = s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")}
-                         | env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz
-                         | java -cp executor.jar:spark-${config.Webserver.sparkVersion}/lib/*
-                         | -Dscala.usejavacp=true
-                         | -Djava.library.path=/usr/lib org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher
-                         | ${jobManager.jobId} ${config.master} ${actionData.name} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin
-        ctx.setCommands(Collections.singletonList(command))
-
-        ctx.setLocalResources(Map[String, LocalResource] (
-          "executor.jar" -> executorJar
-        ))
-
-        nmClient.startContainerAsync(container, ctx)
-        actionData.id
-      }
-
-      containerTask onComplete {
-        case Failure(t) => {
-          println(s"launching container failed: ${t.getMessage}")
-        }
-
-        case Success(actionDataId) => {
-          containersIdsToTaskIds.put(container.getId.getContainerId, actionDataId)
-          println(s"launching container succeeded: ${container.getId}")
-        }
-      }
-    }
-  }
-}
diff --git a/leader/src/main/scripts/ama-start-mesos.sh b/leader/src/main/scripts/ama-start-mesos.sh
index 18dbed9..e01ea42 100755
--- a/leader/src/main/scripts/ama-start-mesos.sh
+++ b/leader/src/main/scripts/ama-start-mesos.sh
@@ -126,9 +126,9 @@ if ! ls ${BASEDIR}/dist/spark*.tgz 1> /dev/null 2>&1; then
     #wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
     wget http://apache.mirror.digitalpacific.com.au/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
 fi
-if [ ! -f ${BASEDIR}/dist/Miniconda2-latest-Linux-x86_64.sh ]; then
+if [ ! -f ${BASEDIR}/dist/miniconda.sh ]; then
     echo "${bold}Fetching miniconda distributable ${NC}"
-    wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -P ${BASEDIR}/dist
+    wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -O miniconda.sh -P ${BASEDIR}/dist
 fi
 cp ${BASEDIR}/amaterasu.properties ${BASEDIR}/dist
 eval $CMD | grep "===>"
diff --git a/leader/src/main/scripts/ama-start-yarn.sh b/leader/src/main/scripts/ama-start-yarn.sh
index c0f8d52..1419371 100755
--- a/leader/src/main/scripts/ama-start-yarn.sh
+++ b/leader/src/main/scripts/ama-start-yarn.sh
@@ -136,9 +136,9 @@ fi
 
 echo $CMD
 
-if [ ! -f ${BASEDIR}/dist/Miniconda2-latest-Linux-x86_64.sh ]; then
+if [ ! -f ${BASEDIR}/dist/miniconda.sh ]; then
     echo "${bold}Fetching miniconda distributable ${NC}"
-    wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -P ${BASEDIR}/dist
+    wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -O ${BASEDIR}/dist/miniconda.sh
 fi
 
 
diff --git a/leader/src/main/scripts/amaterasu.properties b/leader/src/main/scripts/amaterasu.properties
index 5cd6638..cb9e896 100755
--- a/leader/src/main/scripts/amaterasu.properties
+++ b/leader/src/main/scripts/amaterasu.properties
@@ -12,10 +12,10 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
-zk=127.0.0.1
+zk=192.168.1.90
 version=0.2.0-incubating-rc3
-master=192.168.33.11
-user=root
+master=192.168.1.90
+user=amaterasu
 mode=yarn
 webserver.port=8000
 webserver.root=dist
@@ -25,5 +25,9 @@ yarn.jarspath=hdfs:///apps/amaterasu
 spark.home=/usr/hdp/current/spark2-client
 #spark.home=/opt/cloudera/parcels/SPARK2-2.1.0.cloudera2-1.cdh5.7.0.p0.171658/lib/spark2
 yarn.hadoop.home.dir=/etc/hadoop
-spark.opts.spark.yarn.am.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
-spark.opts.spark.driver.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
+#spark.opts.spark.yarn.am.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
+#spark.opts.spark.driver.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
+spark.opts.spark.yarn.am.extraJavaOptions="-Dhdp.version=2.6.5.0-292"
+spark.opts.spark.driver.extraJavaOptions="-Dhdp.version=2.6.5.0-292"
+amaterasu.executor.extra.java.opts = "-Xmx1G -Dscala.usejavacp=true -Dhdp.version=2.6.5.0-292"
+
diff --git a/settings.gradle b/settings.gradle
index 1056e01..c222795 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,8 +15,20 @@
  * limitations under the License.
  */
 include 'leader'
-include 'executor'
+project(':leader')
+
 include 'common'
+project(':common')
+
+include 'executor'
+project(':executor')
+
 include 'sdk'
 findProject(':sdk')?.name = 'amaterasu-sdk'
 
+//Spark
+include 'spark-runner'
+project(':spark-runner').projectDir=file("frameworks/spark/runner")
+include 'spark-runtime'
+project(':spark-runtime').projectDir=file("frameworks/spark/runtime")
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services