You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@amaterasu.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/06/09 04:35:00 UTC

[jira] [Commented] (AMATERASU-24) Refactor Spark out of Amaterasu executor to it's own project

    [ https://issues.apache.org/jira/browse/AMATERASU-24?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506814#comment-16506814 ] 

ASF GitHub Bot commented on AMATERASU-24:
-----------------------------------------

arunma closed pull request #22: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own pro…
URL: https://github.com/apache/incubator-amaterasu/pull/22
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 79b926f..8b56e00 100644
--- a/build.gradle
+++ b/build.gradle
@@ -25,10 +25,6 @@ allprojects {
     version '0.2.0-incubating-rc3'
 }
 
-project(':leader')
-project(':common')
-project(':executor')
-
 task copyLeagalFiles(type: Copy) {
     from "./DISCLAIMER", "./LICENSE", "./NOTICE"
     into "${buildDir}/amaterasu"
diff --git a/executor/build.gradle b/executor/build.gradle
index 21bc2b0..790f287 100644
--- a/executor/build.gradle
+++ b/executor/build.gradle
@@ -54,7 +54,7 @@ dependencies {
 
     compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
     compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
-    compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8'
+    //compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8'
     compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final'
     compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
     compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
@@ -75,18 +75,7 @@ dependencies {
     compile project(':common')
     compile project(':amaterasu-sdk')
 
-    //runtime dependency for spark
-    provided('org.apache.spark:spark-repl_2.11:2.2.1')
-    provided('org.apache.spark:spark-core_2.11:2.2.1')
-
-    testCompile project(':common')
-    testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
-    testRuntime 'org.pegdown:pegdown:1.1.0'
-    testCompile 'junit:junit:4.11'
-    testCompile 'org.scalatest:scalatest_2.11:3.0.2'
-    testCompile 'org.scala-lang:scala-library:2.11.8'
-    testCompile('org.apache.spark:spark-repl_2.11:2.2.1')
-    testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
+
 
 }
 
diff --git a/executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java b/executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java
deleted file mode 100755
index a521fce..0000000
--- a/executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark;
-
-import org.apache.amaterasu.executor.runtime.AmaContext;
-import org.apache.amaterasu.common.runtime.Environment;
-
-import org.apache.spark.SparkEnv;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-
-import org.apache.spark.sql.SparkSession;
-import py4j.GatewayServer;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class PySparkEntryPoint {
-
-    //private static Boolean started = false;
-    private static PySparkExecutionQueue queue = new PySparkExecutionQueue();
-    private static ConcurrentHashMap<String, ResultQueue> resultQueues = new ConcurrentHashMap<>();
-
-    private static int port = 0;
-    private static SparkSession sparkSession = null;
-    private static JavaSparkContext jsc = null;
-    private static SQLContext sqlContext = null;
-    private static SparkEnv sparkEnv = null;
-
-    public static PySparkExecutionQueue getExecutionQueue() {
-        return queue;
-    }
-
-    public static ResultQueue getResultQueue(String actionName) {
-        resultQueues.putIfAbsent(actionName, new ResultQueue());
-        return resultQueues.get(actionName);
-    }
-
-    public static JavaSparkContext getJavaSparkContext() {
-        SparkEnv.set(sparkEnv);
-        return jsc;
-    }
-
-    public static String getJobId(){
-        return AmaContext.jobId();
-    }
-
-    public static Environment getEnv(){
-        return AmaContext.env();
-    }
-
-    public static SQLContext getSqlContext() {
-        SparkEnv.set(sparkEnv);
-        return sqlContext;
-    }
-
-    public static SparkSession getSparkSession() {
-        SparkEnv.set(sparkEnv);
-        return sparkSession;
-    }
-
-    public static SparkConf getSparkConf() {
-        return jsc.getConf();
-    }
-
-    private static void generatePort() {
-
-        try {
-
-            ServerSocket socket = new ServerSocket(0);
-            port = socket.getLocalPort();
-
-            socket.close();
-
-        } catch (IOException e) {
-        }
-
-    }
-
-    public static int getPort() {
-        return port;
-    }
-
-    public static void start(SparkSession spark,
-                             String jobName,
-                             Environment env,
-                             SparkEnv sparkEnv) {
-
-        AmaContext.init(spark, jobName, env);
-
-        sparkSession = spark;
-        jsc = new JavaSparkContext(spark.sparkContext());
-        sqlContext = spark.sqlContext();
-        PySparkEntryPoint.sparkEnv = sparkEnv;
-        generatePort();
-        GatewayServer gatewayServer = new GatewayServer(new PySparkEntryPoint(), port);
-
-        gatewayServer.start();
-    }
-}
diff --git a/executor/src/main/resources/codegen.py b/executor/src/main/resources/codegen.py
deleted file mode 100644
index 113d9be..0000000
--- a/executor/src/main/resources/codegen.py
+++ /dev/null
@@ -1,577 +0,0 @@
-"""
-    codegen
-    ~~~~~~~
-
-    Extension to ast that allow ast -> python code generation.
-
-    :copyright: Copyright 2008 by Armin Ronacher.
-    :license: BSD.
-"""
-from ast import *
-
-BINOP_SYMBOLS = {}
-BINOP_SYMBOLS[Add] = '+'
-BINOP_SYMBOLS[Sub] = '-'
-BINOP_SYMBOLS[Mult] = '*'
-BINOP_SYMBOLS[Div] = '/'
-BINOP_SYMBOLS[Mod] = '%'
-BINOP_SYMBOLS[Pow] = '**'
-BINOP_SYMBOLS[LShift] = '<<'
-BINOP_SYMBOLS[RShift] = '>>'
-BINOP_SYMBOLS[BitOr] = '|'
-BINOP_SYMBOLS[BitXor] = '^'
-BINOP_SYMBOLS[BitAnd] = '&'
-BINOP_SYMBOLS[FloorDiv] = '//'
-
-BOOLOP_SYMBOLS = {}
-BOOLOP_SYMBOLS[And] = 'and'
-BOOLOP_SYMBOLS[Or] = 'or'
-
-CMPOP_SYMBOLS = {}
-CMPOP_SYMBOLS[Eq] = '=='
-CMPOP_SYMBOLS[NotEq] = '!='
-CMPOP_SYMBOLS[Lt] = '<'
-CMPOP_SYMBOLS[LtE] = '<='
-CMPOP_SYMBOLS[Gt] = '>'
-CMPOP_SYMBOLS[GtE] = '>='
-CMPOP_SYMBOLS[Is] = 'is'
-CMPOP_SYMBOLS[IsNot] = 'is not'
-CMPOP_SYMBOLS[In] = 'in'
-CMPOP_SYMBOLS[NotIn] = 'not in'
-
-UNARYOP_SYMBOLS = {}
-UNARYOP_SYMBOLS[Invert] = '~'
-UNARYOP_SYMBOLS[Not] = 'not'
-UNARYOP_SYMBOLS[UAdd] = '+'
-UNARYOP_SYMBOLS[USub] = '-'
-
-
-def to_source(node, indent_with=' ' * 4, add_line_information=False):
-    """This function can convert a node tree back into python sourcecode.
-    This is useful for debugging purposes, especially if you're dealing with
-    custom asts not generated by python itself.
-
-    It could be that the sourcecode is evaluable when the AST itself is not
-    compilable / evaluable.  The reason for this is that the AST contains some
-    more data than regular sourcecode does, which is dropped during
-    conversion.
-
-    Each level of indentation is replaced with `indent_with`.  Per default this
-    parameter is equal to four spaces as suggested by PEP 8, but it might be
-    adjusted to match the application's styleguide.
-
-    If `add_line_information` is set to `True` comments for the line numbers
-    of the nodes are added to the output.  This can be used to spot wrong line
-    number information of statement nodes.
-    """
-    generator = SourceGenerator(indent_with, add_line_information)
-    generator.visit(node)
-
-    return ''.join(generator.result)
-
-class SourceGenerator(NodeVisitor):
-    """This visitor is able to transform a well formed syntax tree into python
-    sourcecode.  For more details have a look at the docstring of the
-    `node_to_source` function.
-    """
-
-    def __init__(self, indent_with, add_line_information=False):
-        self.result = []
-        self.indent_with = indent_with
-        self.add_line_information = add_line_information
-        self.indentation = 0
-        self.new_lines = 0
-
-    def write(self, x):
-        if self.new_lines:
-            if self.result:
-                self.result.append('\n' * self.new_lines)
-            self.result.append(self.indent_with * self.indentation)
-            self.new_lines = 0
-        self.result.append(x)
-
-    def newline(self, node=None, extra=0):
-        self.new_lines = max(self.new_lines, 1 + extra)
-        if node is not None and self.add_line_information:
-            self.write('# line: %s' % node.lineno)
-            self.new_lines = 1
-
-    def body(self, statements):
-        self.new_line = True
-        self.indentation += 1
-        for stmt in statements:
-            self.visit(stmt)
-        self.indentation -= 1
-
-    def body_or_else(self, node):
-        self.body(node.body)
-        if node.orelse:
-            self.newline()
-            self.write('else:')
-            self.body(node.orelse)
-
-    def signature(self, node):
-        want_comma = []
-        def write_comma():
-            if want_comma:
-                self.write(', ')
-            else:
-                want_comma.append(True)
-
-        padding = [None] * (len(node.args) - len(node.defaults))
-        for arg, default in zip(node.args, padding + node.defaults):
-            write_comma()
-            self.visit(arg)
-            if default is not None:
-                self.write('=')
-                self.visit(default)
-        if node.vararg is not None:
-            write_comma()
-            self.write('*' + node.vararg)
-        if node.kwarg is not None:
-            write_comma()
-            self.write('**' + node.kwarg)
-
-    def decorators(self, node):
-        for decorator in node.decorator_list:
-            self.newline(decorator)
-            self.write('@')
-            self.visit(decorator)
-
-    # Statements
-
-    def visit_Assert(self, node):
-        self.newline(node)
-        self.write('assert ')
-        self.visit(node.test)
-        if node.msg is not None:
-           self.write(', ')
-           self.visit(node.msg)
-
-    def visit_Assign(self, node):
-        self.newline(node)
-        for idx, target in enumerate(node.targets):
-            if idx:
-                self.write(', ')
-            self.visit(target)
-        self.write(' = ')
-        self.visit(node.value)
-
-    def visit_AugAssign(self, node):
-        self.newline(node)
-        self.visit(node.target)
-        self.write(' ' + BINOP_SYMBOLS[type(node.op)] + '= ')
-        self.visit(node.value)
-
-    def visit_ImportFrom(self, node):
-        self.newline(node)
-        self.write('from %s%s import ' % ('.' * node.level, node.module))
-        for idx, item in enumerate(node.names):
-            if idx:
-                self.write(', ')
-            self.write(item)
-
-    def visit_Import(self, node):
-        self.newline(node)
-        for item in node.names:
-            self.write('import ')
-            self.visit(item)
-
-    def visit_Expr(self, node):
-        self.newline(node)
-        self.generic_visit(node)
-
-    def visit_FunctionDef(self, node):
-        self.newline(extra=1)
-        self.decorators(node)
-        self.newline(node)
-        self.write('def %s(' % node.name)
-        self.visit(node.args)
-        self.write('):')
-        self.body(node.body)
-
-    def visit_ClassDef(self, node):
-        have_args = []
-        def paren_or_comma():
-            if have_args:
-                self.write(', ')
-            else:
-                have_args.append(True)
-                self.write('(')
-
-        self.newline(extra=2)
-        self.decorators(node)
-        self.newline(node)
-        self.write('class %s' % node.name)
-        for base in node.bases:
-            paren_or_comma()
-            self.visit(base)
-        # XXX: the if here is used to keep this module compatible
-        #      with python 2.6.
-        if hasattr(node, 'keywords'):
-            for keyword in node.keywords:
-                paren_or_comma()
-                self.write(keyword.arg + '=')
-                self.visit(keyword.value)
-            if node.starargs is not None:
-                paren_or_comma()
-                self.write('*')
-                self.visit(node.starargs)
-            if node.kwargs is not None:
-                paren_or_comma()
-                self.write('**')
-                self.visit(node.kwargs)
-        self.write(have_args and '):' or ':')
-        self.body(node.body)
-
-    def visit_If(self, node):
-        self.newline(node)
-        self.write('if ')
-        self.visit(node.test)
-        self.write(':')
-        self.body(node.body)
-        while True:
-            else_ = node.orelse
-            if len(else_) == 0:
-                break
-            elif len(else_) == 1 and isinstance(else_[0], If):
-                node = else_[0]
-                self.newline()
-                self.write('elif ')
-                self.visit(node.test)
-                self.write(':')
-                self.body(node.body)
-            else:
-                self.newline()
-                self.write('else:')
-                self.body(else_)
-                break
-
-    def visit_For(self, node):
-        self.newline(node)
-        self.write('for ')
-        self.visit(node.target)
-        self.write(' in ')
-        self.visit(node.iter)
-        self.write(':')
-        self.body_or_else(node)
-
-    def visit_While(self, node):
-        self.newline(node)
-        self.write('while ')
-        self.visit(node.test)
-        self.write(':')
-        self.body_or_else(node)
-
-    def visit_With(self, node):
-        self.newline(node)
-        self.write('with ')
-        self.visit(node.context_expr)
-        if node.optional_vars is not None:
-            self.write(' as ')
-            self.visit(node.optional_vars)
-        self.write(':')
-        self.body(node.body)
-
-    def visit_Pass(self, node):
-        self.newline(node)
-        self.write('pass')
-
-    def visit_Print(self, node):
-        # XXX: python 2.6 only
-        self.newline(node)
-        self.write('print ')
-        want_comma = False
-        if node.dest is not None:
-            self.write(' >> ')
-            self.visit(node.dest)
-            want_comma = True
-        for value in node.values:
-            if want_comma:
-                self.write(', ')
-            self.visit(value)
-            want_comma = True
-        if not node.nl:
-            self.write(',')
-
-    def visit_Delete(self, node):
-        self.newline(node)
-        self.write('del ')
-        for idx, target in enumerate(node):
-            if idx:
-                self.write(', ')
-            self.visit(target)
-
-    def visit_TryExcept(self, node):
-        self.newline(node)
-        self.write('try:')
-        self.body(node.body)
-        for handler in node.handlers:
-            self.visit(handler)
-
-    def visit_TryFinally(self, node):
-        self.newline(node)
-        self.write('try:')
-        self.body(node.body)
-        self.newline(node)
-        self.write('finally:')
-        self.body(node.finalbody)
-
-    def visit_Global(self, node):
-        self.newline(node)
-        self.write('global ' + ', '.join(node.names))
-
-    def visit_Nonlocal(self, node):
-        self.newline(node)
-        self.write('nonlocal ' + ', '.join(node.names))
-
-    def visit_Return(self, node):
-        self.newline(node)
-        if node.value is None:
-            self.write('return')
-        else:
-            self.write('return ')
-            self.visit(node.value)
-
-    def visit_Break(self, node):
-        self.newline(node)
-        self.write('break')
-
-    def visit_Continue(self, node):
-        self.newline(node)
-        self.write('continue')
-
-    def visit_Raise(self, node):
-        # XXX: Python 2.6 / 3.0 compatibility
-        self.newline(node)
-        self.write('raise')
-        if hasattr(node, 'exc') and node.exc is not None:
-            self.write(' ')
-            self.visit(node.exc)
-            if node.cause is not None:
-                self.write(' from ')
-                self.visit(node.cause)
-        elif hasattr(node, 'type') and node.type is not None:
-            self.visit(node.type)
-            if node.inst is not None:
-                self.write(', ')
-                self.visit(node.inst)
-            if node.tback is not None:
-                self.write(', ')
-                self.visit(node.tback)
-
-    # Expressions
-
-    def visit_Attribute(self, node):
-        self.visit(node.value)
-        self.write('.' + node.attr)
-
-    def visit_Call(self, node):
-        want_comma = []
-        def write_comma():
-            if want_comma:
-                self.write(', ')
-            else:
-                want_comma.append(True)
-
-        self.visit(node.func)
-        self.write('(')
-        for arg in node.args:
-            write_comma()
-            self.visit(arg)
-        for keyword in node.keywords:
-            write_comma()
-            self.write(keyword.arg + '=')
-            self.visit(keyword.value)
-        if node.starargs is not None:
-            write_comma()
-            self.write('*')
-            self.visit(node.starargs)
-        if node.kwargs is not None:
-            write_comma()
-            self.write('**')
-            self.visit(node.kwargs)
-        self.write(')')
-
-    def visit_Name(self, node):
-        self.write(node.id)
-
-    def visit_Str(self, node):
-        self.write(repr(node.s))
-
-    def visit_Bytes(self, node):
-        self.write(repr(node.s))
-
-    def visit_Num(self, node):
-        self.write(repr(node.n))
-
-    def visit_Tuple(self, node):
-        self.write('(')
-        idx = -1
-        for idx, item in enumerate(node.elts):
-            if idx:
-                self.write(', ')
-            self.visit(item)
-        self.write(idx and ')' or ',)')
-
-    def sequence_visit(left, right):
-        def visit(self, node):
-            self.write(left)
-            for idx, item in enumerate(node.elts):
-                if idx:
-                    self.write(', ')
-                self.visit(item)
-            self.write(right)
-        return visit
-
-    visit_List = sequence_visit('[', ']')
-    visit_Set = sequence_visit('{', '}')
-    del sequence_visit
-
-    def visit_Dict(self, node):
-        self.write('{')
-        for idx, (key, value) in enumerate(zip(node.keys, node.values)):
-            if idx:
-                self.write(', ')
-            self.visit(key)
-            self.write(': ')
-            self.visit(value)
-        self.write('}')
-
-    def visit_BinOp(self, node):
-        self.visit(node.left)
-        self.write(' %s ' % BINOP_SYMBOLS[type(node.op)])
-        self.visit(node.right)
-
-    def visit_BoolOp(self, node):
-        self.write('(')
-        for idx, value in enumerate(node.values):
-            if idx:
-                self.write(' %s ' % BOOLOP_SYMBOLS[type(node.op)])
-            self.visit(value)
-        self.write(')')
-
-    def visit_Compare(self, node):
-        self.write('(')
-        self.visit(node.left)
-        for op, right in zip(node.ops, node.comparators):
-            self.write(' %s ' % CMPOP_SYMBOLS[type(op)])
-            self.visit(right)
-        self.write(')')
-
-    def visit_UnaryOp(self, node):
-        self.write('(')
-        op = UNARYOP_SYMBOLS[type(node.op)]
-        self.write(op)
-        if op == 'not':
-            self.write(' ')
-        self.visit(node.operand)
-        self.write(')')
-
-    def visit_Subscript(self, node):
-        self.visit(node.value)
-        self.write('[')
-        self.visit(node.slice)
-        self.write(']')
-
-    def visit_Slice(self, node):
-        if node.lower is not None:
-            self.visit(node.lower)
-        self.write(':')
-        if node.upper is not None:
-            self.visit(node.upper)
-        if node.step is not None:
-            self.write(':')
-            if not (isinstance(node.step, Name) and node.step.id == 'None'):
-                self.visit(node.step)
-
-    def visit_ExtSlice(self, node):
-        for idx, item in node.dims:
-            if idx:
-                self.write(', ')
-            self.visit(item)
-
-    def visit_Yield(self, node):
-        self.write('yield ')
-        self.visit(node.value)
-
-    def visit_Lambda(self, node):
-        self.write('lambda ')
-        self.visit(node.args)
-        self.write(': ')
-        self.visit(node.body)
-
-    def visit_Ellipsis(self, node):
-        self.write('Ellipsis')
-
-    def generator_visit(left, right):
-        def visit(self, node):
-            self.write(left)
-            self.visit(node.elt)
-            for comprehension in node.generators:
-                self.visit(comprehension)
-            self.write(right)
-        return visit
-
-    visit_ListComp = generator_visit('[', ']')
-    visit_GeneratorExp = generator_visit('(', ')')
-    visit_SetComp = generator_visit('{', '}')
-    del generator_visit
-
-    def visit_DictComp(self, node):
-        self.write('{')
-        self.visit(node.key)
-        self.write(': ')
-        self.visit(node.value)
-        for comprehension in node.generators:
-            self.visit(comprehension)
-        self.write('}')
-
-    def visit_IfExp(self, node):
-        self.visit(node.body)
-        self.write(' if ')
-        self.visit(node.test)
-        self.write(' else ')
-        self.visit(node.orelse)
-
-    def visit_Starred(self, node):
-        self.write('*')
-        self.visit(node.value)
-
-    def visit_Repr(self, node):
-        # XXX: python 2.6 only
-        self.write('`')
-        self.visit(node.value)
-        self.write('`')
-
-    # Helper Nodes
-
-    def visit_alias(self, node):
-        self.write(node.name)
-        if node.asname is not None:
-            self.write(' as ' + node.asname)
-
-    def visit_comprehension(self, node):
-        self.write(' for ')
-        self.visit(node.target)
-        self.write(' in ')
-        self.visit(node.iter)
-        if node.ifs:
-            for if_ in node.ifs:
-                self.write(' if ')
-                self.visit(if_)
-
-    def visit_excepthandler(self, node):
-        self.newline(node)
-        self.write('except')
-        if node.type is not None:
-            self.write(' ')
-            self.visit(node.type)
-            if node.name is not None:
-                self.write(' as ')
-                self.visit(node.name)
-        self.write(':')
-        self.body(node.body)
-
-    def visit_arguments(self, node):
-        self.signature(node)
diff --git a/executor/src/main/resources/runtime.py b/executor/src/main/resources/runtime.py
deleted file mode 100644
index d01664c..0000000
--- a/executor/src/main/resources/runtime.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
-    def __init__(self, sc, spark, job_id, env):
-        self.sc = sc
-        self.spark = spark
-        self.job_id = job_id
-        self.env = env
-
-    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
-        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
-    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
-        self.name = name
-        self.master = master
-        self.input_root_path = input_root_path
-        self.output_root_path = output_root_path
-        self.working_dir = working_dir
-        self.configuration = configuration
diff --git a/executor/src/main/resources/spark-version-info.properties b/executor/src/main/resources/spark-version-info.properties
deleted file mode 100644
index ce0b312..0000000
--- a/executor/src/main/resources/spark-version-info.properties
+++ /dev/null
@@ -1,11 +0,0 @@
-version=2.1.0-SNAPSHOT
-
-user=root
-
-revision=738b4cc548ca48c010b682b8bc19a2f7e1947cfe
-
-branch=master
-
-date=2016-07-27T11:23:21Z
-
-url=https://github.com/apache/spark.git
diff --git a/executor/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py
deleted file mode 100755
index 0faae2b..0000000
--- a/executor/src/main/resources/spark_intp.py
+++ /dev/null
@@ -1,109 +0,0 @@
-#!/usr/bin/python
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import ast
-import codegen
-import os
-import sys
-import zipimport
-from runtime import AmaContext, Environment
-
-# os.chdir(os.getcwd() + '/build/resources/test/')
-# import zipfile
-# zip = zipfile.ZipFile('pyspark.zip')
-# zip.extractall()
-# zip = zipfile.ZipFile('py4j-0.10.4-src.zip', 'r')
-# zip.extractall()
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
-
-# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
-# py4j_importer = zipimport.zipimporter(py4j_path)
-# py4j = py4j_importer.load_module('py4j')
-from py4j.java_gateway import JavaGateway, GatewayClient, java_import
-from py4j.protocol import Py4JJavaError
-from pyspark.conf import SparkConf
-from pyspark.context import SparkContext
-from pyspark.rdd import RDD
-from pyspark.files import SparkFiles
-from pyspark.storagelevel import StorageLevel
-from pyspark import accumulators
-from pyspark.accumulators import Accumulator, AccumulatorParam
-from pyspark.broadcast import Broadcast
-from pyspark.serializers import MarshalSerializer, PickleSerializer
-from pyspark.sql import SparkSession
-from pyspark.sql import Row
-
-client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client, auto_convert=True)
-entry_point = gateway.entry_point
-queue = entry_point.getExecutionQueue()
-
-java_import(gateway.jvm, "org.apache.spark.SparkEnv")
-java_import(gateway.jvm, "org.apache.spark.SparkConf")
-java_import(gateway.jvm, "org.apache.spark.api.java.*")
-java_import(gateway.jvm, "org.apache.spark.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.sql.*")
-java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-java_import(gateway.jvm, "scala.Tuple2")
-
-jconf = entry_point.getSparkConf()
-jsc = entry_point.getJavaSparkContext()
-
-job_id = entry_point.getJobId()
-javaEnv = entry_point.getEnv()
-
-env = Environment(javaEnv.name(), javaEnv.master(), javaEnv.inputRootPath(), javaEnv.outputRootPath(), javaEnv.workingDir(), javaEnv.configuration())
-conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
-conf.setExecutorEnv('PYTHONPATH', ':'.join(sys.path))
-sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-spark = SparkSession(sc, entry_point.getSparkSession())
-
-ama_context = AmaContext(sc, spark, job_id, env)
-
-while True:
-    actionData = queue.getNext()
-    resultQueue = entry_point.getResultQueue(actionData._2())
-    actionSource = actionData._1()
-    tree = ast.parse(actionSource)
-    exports = actionData._3()
-
-    for node in tree.body:
-
-        wrapper = ast.Module(body=[node])
-        try:
-            co = compile(wrapper, "<ast>", 'exec')
-            exec (co)
-            resultQueue.put('success', actionData._2(), codegen.to_source(node), '')
-
-            #if this node is an assignment, we need to check if it needs to be persisted
-            try:
-                persistCode = ''
-                if(isinstance(node,ast.Assign)):
-                    varName = node.targets[0].id
-                    if(exports.containsKey(varName)):
-                        persistCode = varName + ".write.save(\"" + env.working_dir + "/" + job_id + "/" + actionData._2() + "/" + varName + "\", format=\"" + exports[varName] + "\", mode='overwrite')"
-                        persist = compile(persistCode, '<stdin>', 'exec')
-                        exec(persist)
-
-            except:
-                resultQueue.put('error', actionData._2(), persistCode, str(sys.exc_info()[1]))
-        except:
-            resultQueue.put('error', actionData._2(), codegen.to_source(node), str(sys.exc_info()[1]))
-    resultQueue.put('completion', '', '', '')
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala
deleted file mode 100755
index 411069a..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-
-import java.util
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-
-class PySparkExecutionQueue {
-
-  val queue = new LinkedBlockingQueue[(String, String, util.Map[String, String])]()
-
-  def getNext(): (String, String, util.Map[String, String]) = {
-
-    // if the queue is idle for an hour it will return null which
-    // terminates the python execution, need to revisit
-    queue.poll(1, TimeUnit.HOURS)
-
-  }
-
-  def setForExec(line: (String, String, util.Map[String, String])) = {
-
-    queue.put(line)
-
-  }
-
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala
deleted file mode 100755
index 6dbd445..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-
-import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.ResultType.ResultType
-
-object ResultType extends Enumeration {
-  type ResultType = Value
-  val success = Value("success")
-  val error = Value("error")
-  val completion = Value("completion")
-}
-
-case class PySparkResult(
-  resultType: ResultType,
-  action: String,
-  statement: String,
-  message: String
-)
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
deleted file mode 100755
index 94b8056..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-
-import java.io.{File, PrintWriter, StringWriter}
-import java.util
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{PythonDependencies, PythonPackage}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.SparkEnv
-import org.apache.spark.sql.SparkSession
-
-import scala.sys.process.Process
-
-
-class PySparkRunner extends AmaterasuRunner with Logging {
-
-  var proc: Process = _
-  var notifier: Notifier = _
-
-  override def getIdentifier: String = "pyspark"
-
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-    interpretSources(actionSource, actionName, exports)
-  }
-
-  def interpretSources(source: String, actionName: String, exports: util.Map[String, String]): Unit = {
-
-    PySparkEntryPoint.getExecutionQueue.setForExec((source, actionName, exports))
-    val resQueue = PySparkEntryPoint.getResultQueue(actionName)
-
-    notifier.info(s"================= started action $actionName =================")
-
-    var res: PySparkResult = null
-
-    do {
-      res = resQueue.getNext()
-      res.resultType match {
-        case ResultType.success =>
-          notifier.success(res.statement)
-        case ResultType.error =>
-          notifier.error(res.statement, res.message)
-          throw new Exception(res.message)
-        case ResultType.completion =>
-          notifier.info(s"================= finished action $actionName =================")
-      }
-    } while (res != null && res.resultType != ResultType.completion)
-  }
-
-}
-
-object PySparkRunner {
-
-  def apply(env: Environment,
-            jobId: String,
-            notifier: Notifier,
-            spark: SparkSession,
-            pypath: String,
-            pyDeps: PythonDependencies,
-            config: ClusterConfig): PySparkRunner = {
-
-    //TODO: can we make this less ugly?
-    var pysparkPython = "/usr/bin/python"
-
-    if (pyDeps != null &&
-        pyDeps.packages.nonEmpty) {
-      loadPythonDependencies(pyDeps, notifier)
-      pysparkPython = "miniconda/bin/python"
-    }
-
-    val result = new PySparkRunner
-
-    PySparkEntryPoint.start(spark, jobId, env, SparkEnv.get)
-    val port = PySparkEntryPoint.getPort
-    var intpPath = ""
-    if (env.configuration.contains("cwd")) {
-      val cwd = new File(env.configuration("cwd"))
-      intpPath = s"${cwd.getAbsolutePath}/spark_intp.py" // This is to support test environment
-    } else {
-      intpPath = s"spark_intp.py"
-    }
-    var pysparkPath = ""
-    if (env.configuration.contains("pysparkPath")) {
-      pysparkPath = env.configuration("pysparkPath")
-    } else {
-      pysparkPath = s"${config.spark.home}/bin/spark-submit"
-    }
-    val proc = Process(Seq(pysparkPath, intpPath, port.toString), None,
-      "PYTHONPATH" -> pypath,
-      "PYSPARK_PYTHON" -> pysparkPython,
-      "PYTHONHASHSEED" -> 0.toString) #> System.out
-
-    proc.run()
-
-
-    result.notifier = notifier
-
-    result
-  }
-
-  /**
-    * This installs the required python dependencies.
-    * We basically need 2 packages to make pyspark work with customer's scripts:
-    * 1. py4j - supplied by spark, for communication between Python and Java runtimes.
-    * 2. codegen - for dynamically parsing and converting customer's scripts into executable Python code objects.
-    * Currently we only know how to install packages using Anaconda, the reason is 3rd party OS libraries, e.g. libevent
-    * Anaconda has the capabilities to automatically resolve the required OS libraries per Python package and install them.
-    *
-    * TODO - figure out if we really want to support pip directly, or if Anaconda is enough.
-    * @param deps All of the customer's supplied Python dependencies, this currently comes from job-repo/deps/python.yml
-    * @param notifier
-    */
-  private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
-    notifier.info("loading anaconda evn")
-    installAnacondaOnNode()
-    val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
-    installAnacondaPackage(codegenPackage)
-    try {
-      deps.packages.foreach(pack => {
-        pack.index.getOrElse("anaconda").toLowerCase match {
-          case "anaconda" => installAnacondaPackage(pack)
-          // case "pypi" => installPyPiPackage(pack) TODO: See if we can support this
-        }
-      })
-    }
-    catch {
-
-      case rte: RuntimeException =>
-        val sw = new StringWriter
-        rte.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
-      case e: Exception =>
-        val sw = new StringWriter
-        e.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
-    }
-  }
-
-
-  /**
-    * Installs one python package using Anaconda.
-    * Anaconda works with multiple channels, or better called, repositories.
-    * Normally, if a channel isn't specified, Anaconda will fetch the package from the default conda channel.
-    * The reason we need to use channels, is that sometimes the required package doesn't exist on the default channel.
-    * @param pythonPackage This comes from parsing the python.yml dep file.
-    */
-  private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
-    val channel = pythonPackage.channel.getOrElse("anaconda")
-    if (channel == "anaconda") {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}")
-    } else {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}")
-    }
-  }
-
-  /**
-    * Installs Anaconda and then links it with the local spark that was installed on the executor.
-    */
-  private def installAnacondaOnNode(): Unit = {
-    Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda")
-    Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build")
-    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark")
-  }
-
-
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala
deleted file mode 100755
index 3ac7bd7..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-
-class ResultQueue {
-  val queue = new LinkedBlockingQueue[PySparkResult]()
-
-  def getNext(): PySparkResult = {
-
-    // if the queue is idle for an hour it will return null which
-    // terminates the python execution, need to revisit
-    queue.poll(10, TimeUnit.MINUTES)
-
-  }
-
-  def put(
-    resultType: String,
-    action: String,
-    statement: String,
-    message: String
-  ) = {
-
-    val result = new PySparkResult(ResultType.withName(resultType), action, statement, message)
-    queue.put(result)
-  }
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala
deleted file mode 100644
index d111cfb..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark
-
-import java.io.ByteArrayOutputStream
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.SparkContext
-
-
-class SparkRRunner extends Logging with AmaterasuRunner {
-
-  override def getIdentifier = "spark-r"
-
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-  }
-}
-
-object SparkRRunner {
-  def apply(
-    env: Environment,
-    jobId: String,
-    sparkContext: SparkContext,
-    outStream: ByteArrayOutputStream,
-    notifier: Notifier,
-    jars: Seq[String]
-  ): SparkRRunner = {
-    new SparkRRunner()
-  }
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
deleted file mode 100644
index ff56d8c..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark
-
-import java.io._
-
-import com.jcabi.aether.Aether
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ExecData
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.PySparkRunner
-import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner
-import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
-import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner}
-import org.eclipse.aether.util.artifact.JavaScopes
-import org.sonatype.aether.repository.RemoteRepository
-import org.sonatype.aether.util.artifact.DefaultArtifact
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.concurrent.TrieMap
-import scala.sys.process._
-
-class SparkRunnersProvider extends RunnersProvider with Logging {
-
-  private val runners = new TrieMap[String, AmaterasuRunner]
-  private var shellLoger = ProcessLogger(
-    (o: String) => log.info(o),
-    (e: String) => log.error(e)
-
-  )
-  private var conf: Option[Map[String, Any]] = _
-  private var executorEnv: Option[Map[String, Any]] = _
-
-  override def init(execData: ExecData,
-                    jobId: String,
-                    outStream: ByteArrayOutputStream,
-                    notifier: Notifier,
-                    executorId: String,
-                    config: ClusterConfig,
-                    hostName: String): Unit = {
-
-    shellLoger = ProcessLogger(
-      (o: String) => log.info(o),
-      (e: String) => log.error("", e)
-    )
-
-    var jars = Seq.empty[String]
-
-    if (execData.deps != null) {
-      jars ++= getDependencies(execData.deps)
-    }
-
-    if (execData.pyDeps != null &&
-      execData.pyDeps.packages.nonEmpty) {
-      loadPythonDependencies(execData.pyDeps, notifier)
-    }
-
-    conf = execData.configurations.get("spark")
-    executorEnv = execData.configurations.get("spark_exec_env")
-    val sparkAppName = s"job_${jobId}_executor_$executorId"
-
-    SparkRunnerHelper.notifier = notifier
-    val spark = SparkRunnerHelper.createSpark(execData.env, sparkAppName, jars, conf, executorEnv, config, hostName)
-
-    lazy val sparkScalaRunner = SparkScalaRunner(execData.env, jobId, spark, outStream, notifier, jars)
-    sparkScalaRunner.initializeAmaContext(execData.env)
-
-    runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
-
-    // TODO: get rid of hard-coded version
-    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", execData.pyDeps, config)
-    runners.put(pySparkRunner.getIdentifier, pySparkRunner)
-
-    lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
-    runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner)
-  }
-
-  private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
-    val channel = pythonPackage.channel.getOrElse("anaconda")
-    if (channel == "anaconda") {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
-    } else {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
-    }
-  }
-
-  private def installAnacondaOnNode(): Unit = {
-    // TODO: get rid of hard-coded version
-    Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") ! shellLoger
-    Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") ! shellLoger
-    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger
-  }
-
-  private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
-    notifier.info("loading anaconda evn")
-    installAnacondaOnNode()
-    val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
-    installAnacondaPackage(codegenPackage)
-    try {
-      // notifier.info("loadPythonDependencies #5")
-      deps.packages.foreach(pack => {
-        pack.index.getOrElse("anaconda").toLowerCase match {
-          case "anaconda" => installAnacondaPackage(pack)
-          // case "pypi" => installPyPiPackage(pack)
-        }
-      })
-    }
-    catch {
-
-      case rte: RuntimeException =>
-        val sw = new StringWriter
-        rte.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
-      case e: Exception =>
-        val sw = new StringWriter
-        e.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
-    }
-  }
-
-  override def getGroupIdentifier: String = "spark"
-
-  override def getRunner(id: String): AmaterasuRunner = runners(id)
-
-  private def getDependencies(deps: Dependencies): Seq[String] = {
-
-    // adding a local repo because Aether needs one
-    val repo = new File(System.getProperty("java.io.tmpdir"), "ama-repo")
-
-    val remotes = deps.repos.map(r =>
-      new RemoteRepository(
-        r.id,
-        r.`type`,
-        r.url
-      )).toList.asJava
-
-    val aether = new Aether(remotes, repo)
-
-    deps.artifacts.flatMap(a => {
-      aether.resolve(
-        new DefaultArtifact(a.groupId, a.artifactId, "", "jar", a.version),
-        JavaScopes.RUNTIME
-      ).map(a => a)
-    }).map(x => x.getFile.getAbsolutePath)
-
-  }
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
deleted file mode 100644
index 350ddb4..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql
-
-import java.io.File
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.runtime.AmaContext
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.commons.io.FilenameUtils
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
-import scala.collection.JavaConverters._
-
-/**
-  * Amaterasu currently supports JSON and PARQUET as data sources.
-  * CSV data source support will be provided in later versions.
-  */
-class SparkSqlRunner extends Logging with AmaterasuRunner {
-  var env: Environment = _
-  var notifier: Notifier = _
-  var jobId: String = _
-  //var actionName: String = _
-  var spark: SparkSession = _
-
-  /*
-  Method: executeQuery
-  Description: when user specifies query in amaterasu format, this method parse and executes the query.
-               If not in Amaterasu format, then directly executes the query
-  @Params: query string
-   */
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-
-    notifier.info(s"================= started action $actionName =================")
-
-    if (!actionSource.isEmpty) {
-
-      var result: DataFrame = null
-      if (actionSource.toLowerCase.contains("amacontext")) {
-
-        //Parse the incoming query
-        //notifier.info(s"================= parsing the SQL query =================")
-
-        val parser: List[String] = actionSource.toLowerCase.split(" ").toList
-        var sqlPart1: String = ""
-        var sqlPart2: String = ""
-        var queryTempLen: Int = 0
-
-        //get only the sql part of the query
-        for (i <- 0 to parser.indexOf("from")) {
-          sqlPart1 += parser(i) + " "
-        }
-
-        if (parser.indexOf("readas") == -1) {
-          queryTempLen = parser.length - 1
-        }
-        else
-          queryTempLen = parser.length - 3
-
-        for (i <- parser.indexOf("from") + 1 to queryTempLen) {
-          if (!parser(i).contains("amacontext"))
-            sqlPart2 += " " + parser(i)
-        }
-
-        //If no read format is speicified by the user, use PARQUET as default file format to load data
-        var fileFormat: String = null
-        //if there is no index for "readas" keyword, then set PARQUET as default read format
-        if (parser.indexOf("readas") == -1) {
-          fileFormat = "parquet"
-        }
-        else
-          fileFormat = parser(parser.indexOf("readas") + 1)
-
-
-        val locationPath: String = parser.filter(word => word.contains("amacontext")).mkString("")
-        val directories = locationPath.split("_")
-        val actionName = directories(1)
-        val dfName = directories(2)
-        val parsedQuery = sqlPart1 + locationPath + sqlPart2
-
-        //Load the dataframe from previous action
-        val loadData: DataFrame = AmaContext.getDataFrame(actionName, dfName, fileFormat)
-        loadData.createOrReplaceTempView(locationPath)
-
-
-        try{
-
-        result = spark.sql(parsedQuery)
-        notifier.success(parsedQuery)
-        } catch {
-          case e: Exception => notifier.error(parsedQuery, e.getMessage)
-        }
-
-      }
-      else {
-
-        notifier.info("Executing SparkSql on: " + actionSource)
-
-        result = spark.sql(actionSource)
-      }
-      val exportsBuff = exports.asScala.toBuffer
-      if (exportsBuff.nonEmpty) {
-        val exportName = exportsBuff.head._1
-        val exportFormat = exportsBuff.head._2
-        //notifier.info(s"exporting to -> ${env.workingDir}/$jobId/$actionName/$exportName")
-        result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.workingDir}/$jobId/$actionName/$exportName")
-      }
-      notifier.info(s"================= finished action $actionName =================")
-    }
-  }
-
-  /*
-  Method to find the file type of files within a directory
-  @Params
-  folderName : Path to location of the directory containing data-source files
-  */
-
-  def findFileType(folderName: File): Array[String] = {
-    // get all the files from a directory
-    val files: Array[File] = folderName.listFiles()
-    val extensions: Array[String] = files.map(file => FilenameUtils.getExtension(file.toString))
-    extensions
-  }
-
-  override def getIdentifier: String = "sql"
-
-}
-
-object SparkSqlRunner {
-
-  def apply(env: Environment,
-            jobId: String,
-            // actionName: String,
-            notifier: Notifier,
-            spark: SparkSession): SparkSqlRunner = {
-
-    val sparkSqlRunnerObj = new SparkSqlRunner
-
-    sparkSqlRunnerObj.env = env
-    sparkSqlRunnerObj.jobId = jobId
-    //sparkSqlRunnerObj.actionName = actionName
-    sparkSqlRunnerObj.notifier = notifier
-    sparkSqlRunnerObj.spark = spark
-    sparkSqlRunnerObj
-  }
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
index 9ab75be..0acd1fe 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
@@ -26,7 +26,6 @@ import org.apache.amaterasu.executor.common.executors.ProvidersFactory
 import org.apache.mesos.Protos._
 import org.apache.mesos.protobuf.ByteString
 import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver}
-import org.apache.spark.SparkContext
 
 import scala.collection.JavaConverters._
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -37,7 +36,6 @@ class MesosActionsExecutor extends Executor with Logging {
 
   var master: String = _
   var executorDriver: ExecutorDriver = _
-  var sc: SparkContext = _
   var jobId: String = _
   var actionName: String = _
   //  var sparkScalaRunner: SparkScalaRunner = _
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala b/executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala
deleted file mode 100755
index a61cd5a..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.runtime
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.spark.SparkContext
-import org.apache.spark.sql._
-
-object AmaContext extends Logging {
-
-  var spark: SparkSession = _
-  var sc: SparkContext = _
-  var jobId: String = _
-  var env: Environment = _
-
-  def init(spark: SparkSession,
-           jobId: String,
-           env: Environment): Unit = {
-
-    AmaContext.spark = spark
-    AmaContext.sc = spark.sparkContext
-    AmaContext.jobId = jobId
-    AmaContext.env = env
-
-  }
-
-  def getDataFrame(actionName: String, dfName: String, format: String = "parquet"): DataFrame = {
-
-    spark.read.format(format).load(s"${env.workingDir}/$jobId/$actionName/$dfName")
-
-  }
-
-  def getDataset[T: Encoder](actionName: String, dfName: String, format: String = "parquet"): Dataset[T] = {
-
-    getDataFrame(actionName, dfName, format).as[T]
-
-  }
-
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index f4f553c..b5f8700 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -19,24 +19,18 @@ package org.apache.amaterasu.executor.yarn.executors
 import java.io.ByteArrayOutputStream
 import java.net.{InetAddress, URLDecoder}
 
-import scala.collection.JavaConverters._
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.executor.common.executors.{ActiveNotifier, ProvidersFactory}
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.spark.SparkContext
 
-import scala.reflect.internal.util.ScalaClassLoader
-import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader
+import scala.collection.JavaConverters._
 
 
 class ActionsExecutor extends Logging {
 
   var master: String = _
-  var sc: SparkContext = _
   var jobId: String = _
   var actionName: String = _
   var taskData: TaskData = _
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala
deleted file mode 100755
index 19ef3de..0000000
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.repl.amaterasu
-
-import java.io.PrintWriter
-
-import org.apache.spark.repl.SparkILoop
-
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.IMain
-
-class AmaSparkILoop(writer: PrintWriter) extends SparkILoop(None, writer) {
-
-  def create = {
-    this.createInterpreter
-  }
-
-  def setSettings(settings: Settings) = {
-    this.settings = settings
-  }
-
-  def getIntp: IMain = {
-    this.intp
-  }
-
-}
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
deleted file mode 100644
index 0bf7337..0000000
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.repl.amaterasu.runners.spark
-
-import java.io.{ByteArrayOutputStream, File, PrintWriter}
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.common.utils.FileUtils
-import org.apache.spark.repl.amaterasu.AmaSparkILoop
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.Utils
-import org.apache.spark.SparkConf
-
-import scala.tools.nsc.GenericRunnerSettings
-import scala.tools.nsc.interpreter.IMain
-
-object SparkRunnerHelper extends Logging {
-
-  private val conf = new SparkConf()
-  private val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
-  private val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
-
-  private var sparkSession: SparkSession = _
-
-  var notifier: Notifier = _
-
-  private var interpreter: IMain = _
-
-  def getNode: String = sys.env.get("AMA_NODE") match {
-    case None => "127.0.0.1"
-    case _ => sys.env("AMA_NODE")
-  }
-
-  def getOrCreateScalaInterperter(outStream: ByteArrayOutputStream, jars: Seq[String], recreate: Boolean = false): IMain = {
-    if (interpreter == null || recreate) {
-      initInterpreter(outStream, jars)
-    }
-    interpreter
-  }
-
-  private def scalaOptionError(msg: String): Unit = {
-    notifier.error("", msg)
-  }
-
-  private def initInterpreter(outStream: ByteArrayOutputStream, jars: Seq[String]): Unit = {
-
-    var result: IMain = null
-    val config = new ClusterConfig()
-    try {
-
-      val interpArguments = List(
-        "-Yrepl-class-based",
-        "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
-        "-classpath", jars.mkString(File.separator)
-      )
-
-      val settings = new GenericRunnerSettings(scalaOptionError)
-      settings.processArguments(interpArguments, processAll = true)
-
-      settings.classpath.append(System.getProperty("java.class.path") + java.io.File.pathSeparator +
-        "spark-" + config.Webserver.sparkVersion + "/jars/*" + java.io.File.pathSeparator +
-        jars.mkString(java.io.File.pathSeparator))
-
-      settings.usejavacp.value = true
-
-      val out = new PrintWriter(outStream)
-      val interpreter = new AmaSparkILoop(out)
-      interpreter.setSettings(settings)
-
-      interpreter.create
-
-      val intp = interpreter.getIntp
-
-      settings.embeddedDefaults(Thread.currentThread().getContextClassLoader)
-      intp.setContextClassLoader
-      intp.initializeSynchronous
-
-      result = intp
-    }
-    catch {
-      case e: Exception =>
-        println(new Predef.String(outStream.toByteArray))
-
-    }
-
-    interpreter = result
-  }
-
-
-  def createSpark(env: Environment,
-                  sparkAppName: String,
-                  jars: Seq[String],
-                  sparkConf: Option[Map[String, Any]],
-                  executorEnv: Option[Map[String, Any]],
-                  config: ClusterConfig,
-                  hostName: String): SparkSession = {
-
-    Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
-    val minicondaPkgsPath = "miniconda/pkgs"
-    val executorMinicondaDirRef = new File(minicondaPkgsPath)
-    val minicondaFiles = if (executorMinicondaDirRef.exists) FileUtils.getAllFiles(executorMinicondaDirRef) else new Array[File](0)
-    val pyfiles = minicondaFiles.filter(f => f.getName.endsWith(".py") ||
-      f.getName.endsWith(".egg") ||
-      f.getName.endsWith(".zip"))
-
-    conf.setAppName(sparkAppName)
-      .set("spark.driver.host", hostName)
-      .set("spark.submit.deployMode", "client")
-      .set("spark.hadoop.validateOutputSpecs", "false")
-      .set("spark.logConf", "true")
-      .set("spark.submit.pyFiles", pyfiles.mkString(","))
-
-
-    val master: String = if (env.master.isEmpty) {
-      "yarn"
-    } else {
-      env.master
-    }
-
-    config.mode match {
-
-      case "mesos" =>
-        conf.set("spark.executor.uri", s"http://$getNode:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz")
-          .setJars(jars)
-          .set("spark.master", env.master)
-          .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-2.2.1-bin-hadoop2.7")
-
-      case "yarn" =>
-        conf.set("spark.home", config.spark.home)
-          // TODO: parameterize those
-          .setJars(s"executor.jar" +: jars)
-          .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab")
-          .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64")
-          .set("spark.yarn.queue", "default")
-          .set("spark.history.kerberos.principal", "none")
-
-          .set("spark.master", master)
-          .set("spark.executor.instances", "1") // TODO: change this
-          .set("spark.yarn.jars", s"spark/jars/*")
-          .set("spark.executor.memory", "1g")
-          .set("spark.dynamicAllocation.enabled", "false")
-          .set("spark.eventLog.enabled", "false")
-          .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
-          .set("hadoop.home.dir", config.YARN.hadoopHomeDir)
-
-      case _ => throw new Exception(s"mode ${config.mode} is not legal.")
-    }
-
-    if (config.spark.opts != null && config.spark.opts.nonEmpty) {
-      config.spark.opts.foreach(kv => {
-        log.info(s"Setting ${kv._1} to ${kv._2} as specified in amaterasu.properties")
-        conf.set(kv._1, kv._2)
-      })
-    }
-
-    // adding the the configurations from spark.yml
-    sparkConf match {
-      case Some(cnf) => {
-        for (c <- cnf) {
-          if (c._2.isInstanceOf[String])
-            conf.set(c._1, c._2.toString)
-        }
-      }
-      case None =>
-    }
-
-    // setting the executor env from spark_exec.yml
-    executorEnv match {
-      case Some(env) => {
-        for (c <- env) {
-          if (c._2.isInstanceOf[String])
-            conf.setExecutorEnv(c._1, c._2.toString)
-        }
-      }
-      case None =>
-    }
-
-    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-
-    sparkSession = SparkSession.builder
-      .appName(sparkAppName)
-      .master(env.master)
-
-      //.enableHiveSupport()
-      .config(conf).getOrCreate()
-
-    sparkSession.conf.getAll.foreach(x => log.info(x.toString))
-
-    val hc = sparkSession.sparkContext.hadoopConfiguration
-
-    sys.env.get("AWS_ACCESS_KEY_ID") match {
-      case None =>
-      case _ =>
-        hc.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
-        hc.set("fs.s3n.awsAccessKeyId", sys.env("AWS_ACCESS_KEY_ID"))
-        hc.set("fs.s3n.awsSecretAccessKey", sys.env("AWS_SECRET_ACCESS_KEY"))
-    }
-
-    sparkSession
-  }
-}
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
deleted file mode 100755
index a45b8c0..0000000
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.repl.amaterasu.runners.spark
-
-import java.io.ByteArrayOutputStream
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.runtime.AmaContext
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.sql.{Dataset, SparkSession}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.io.Source
-import scala.tools.nsc.interpreter.{IMain, Results}
-
-class ResHolder(var value: Any)
-
-class SparkScalaRunner(var env: Environment,
-                       var jobId: String,
-                       var interpreter: IMain,
-                       var outStream: ByteArrayOutputStream,
-                       var spark: SparkSession,
-                       var notifier: Notifier,
-                       var jars: Seq[String]) extends Logging with AmaterasuRunner {
-
-  private def scalaOptionError(msg: String): Unit = {
-    notifier.error("", msg)
-  }
-
-  override def getIdentifier = "scala"
-
-  val holder = new ResHolder(null)
-
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-    val source = Source.fromString(actionSource)
-    interpretSources(source, actionName, exports.asScala.toMap)
-  }
-
-  def interpretSources(source: Source, actionName: String, exports: Map[String, String]): Unit = {
-
-    notifier.info(s"================= started action $actionName =================")
-    //notifier.info(s"exports is: $exports")
-
-    for (line <- source.getLines()) {
-
-      if (!line.isEmpty) {
-
-        outStream.reset()
-        log.debug(line)
-
-        if (line.startsWith("import")) {
-          interpreter.interpret(line)
-        }
-        else {
-
-          val intresult = interpreter.interpret(line)
-
-          val result = interpreter.prevRequestList.last.lineRep.call("$result")
-
-          // intresult: Success, Error, etc
-          // result: the actual result (RDD, df, etc.) for caching
-          // outStream.toString gives you the error message
-          intresult match {
-            case Results.Success =>
-              log.debug("Results.Success")
-
-              notifier.success(line)
-
-              val resultName = interpreter.prevRequestList.last.termNames.last
-
-              //notifier.info(s" result name ${resultName.toString}")
-              //notifier.info(s" exist in exports: ${exports.contains(resultName.toString)}")
-
-              if (exports.contains(resultName.toString)) {
-
-                val format = exports(resultName.toString)
-
-                if (result != null) {
-
-                  result match {
-                    case ds: Dataset[_] =>
-                      log.debug(s"persisting DataFrame: $resultName")
-                      val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.workingDir}/$jobId/$actionName/$resultName")"""
-                      val writeResult = interpreter.interpret(writeLine)
-                      if (writeResult != Results.Success) {
-                        val err = outStream.toString
-                        notifier.error(writeLine, err)
-                        log.error(s"error persisting dataset: $writeLine failed with: $err")
-                        //throw new Exception(err)
-                      }
-                      log.debug(s"persisted DataFrame: $resultName")
-
-                    case _ => notifier.info(s"""+++> result type ${result.getClass}""")
-                  }
-                }
-              }
-
-            case Results.Error =>
-              log.debug("Results.Error")
-              val err = outStream.toString
-              notifier.error(line, err)
-              throw new Exception(err)
-
-            case Results.Incomplete =>
-              log.debug("Results.Incomplete")
-
-          }
-        }
-      }
-    }
-
-    notifier.info(s"================= finished action $actionName =================")
-  }
-
-  def initializeAmaContext(env: Environment): Unit = {
-
-    // setting up some context :)
-    val sc = this.spark.sparkContext
-    val sqlContext = this.spark.sqlContext
-
-    interpreter.interpret("import scala.util.control.Exception._")
-    interpreter.interpret("import org.apache.spark.{ SparkContext, SparkConf }")
-    interpreter.interpret("import org.apache.spark.sql.SQLContext")
-    interpreter.interpret("import org.apache.spark.sql.{ Dataset, SparkSession }")
-    interpreter.interpret("import org.apache.spark.sql.SaveMode")
-    interpreter.interpret("import org.apache.amaterasu.executor.runtime.AmaContext")
-    interpreter.interpret("import org.apache.amaterasu.common.runtime.Environment")
-
-    // creating a map (_contextStore) to hold the different spark contexts
-    // in th REPL and getting a reference to it
-    interpreter.interpret("var _contextStore = scala.collection.mutable.Map[String, AnyRef]()")
-    val contextStore = interpreter.prevRequestList.last.lineRep.call("$result").asInstanceOf[mutable.Map[String, AnyRef]]
-    AmaContext.init(spark, jobId, env)
-
-    // populating the contextStore
-    contextStore.put("sc", sc)
-    contextStore.put("sqlContext", sqlContext)
-    contextStore.put("env", env)
-    contextStore.put("spark", spark)
-    contextStore.put("ac", AmaContext)
-
-    interpreter.interpret("val sc = _contextStore(\"sc\").asInstanceOf[SparkContext]")
-    interpreter.interpret("val sqlContext = _contextStore(\"sqlContext\").asInstanceOf[SQLContext]")
-    interpreter.interpret("val env = _contextStore(\"env\").asInstanceOf[Environment]")
-    interpreter.interpret("val spark = _contextStore(\"spark\").asInstanceOf[SparkSession]")
-    interpreter.interpret("val AmaContext = _contextStore(\"ac\").asInstanceOf[AmaContext]")
-    interpreter.interpret("import sqlContext.implicits._")
-
-    // initializing the AmaContext
-    println(s"""AmaContext.init(sc, sqlContext ,"$jobId")""")
-
-  }
-
-}
-
-object SparkScalaRunner extends Logging {
-
-  def apply(env: Environment,
-            jobId: String,
-            spark: SparkSession,
-            outStream: ByteArrayOutputStream,
-            notifier: Notifier,
-            jars: Seq[String]): SparkScalaRunner = {
-
-    new SparkScalaRunner(env, jobId, SparkRunnerHelper.getOrCreateScalaInterperter(outStream, jars), outStream, spark, notifier, jars)
-
-  }
-
-}
diff --git a/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv b/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
deleted file mode 100644
index 5f0ce0e..0000000
--- a/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
+++ /dev/null
@@ -1,4 +0,0 @@
-name,age
-sampath,22
-kirupa,30
-dev,19
\ No newline at end of file
diff --git a/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json b/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
deleted file mode 100644
index d297f1f..0000000
--- a/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
+++ /dev/null
@@ -1,3 +0,0 @@
-{"name":"Sampath","age":22}
-{"name":"Kirupa", "age":30}
-{"name":"Dev", "age":19}]
\ No newline at end of file
diff --git a/executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
deleted file mode 100644
index 7d66b64..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc and /dev/null differ
diff --git a/executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
deleted file mode 100644
index 74b1890..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc and /dev/null differ
diff --git a/executor/src/test/resources/SparkSql/parquet/_SUCCESS b/executor/src/test/resources/SparkSql/parquet/_SUCCESS
deleted file mode 100644
index e69de29..0000000
diff --git a/executor/src/test/resources/SparkSql/parquet/_common_metadata b/executor/src/test/resources/SparkSql/parquet/_common_metadata
deleted file mode 100644
index 5d83fd6..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/_common_metadata and /dev/null differ
diff --git a/executor/src/test/resources/SparkSql/parquet/_metadata b/executor/src/test/resources/SparkSql/parquet/_metadata
deleted file mode 100644
index ac54053..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/_metadata and /dev/null differ
diff --git a/executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644
index e1b0d2e..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet and /dev/null differ
diff --git a/executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644
index d807ba9..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet and /dev/null differ
diff --git a/executor/src/test/resources/amaterasu.properties b/executor/src/test/resources/amaterasu.properties
deleted file mode 100755
index 19cb189..0000000
--- a/executor/src/test/resources/amaterasu.properties
+++ /dev/null
@@ -1,8 +0,0 @@
-zk=127.0.0.1
-version=0.2.0-incubating
-master=192.168.33.11
-user=root
-mode=mesos
-webserver.port=8000
-webserver.root=dist
-spark.version=2.1.1-bin-hadoop2.7
diff --git a/executor/src/test/resources/codegen.py b/executor/src/test/resources/codegen.py
deleted file mode 100644
index 113d9be..0000000
--- a/executor/src/test/resources/codegen.py
+++ /dev/null
@@ -1,577 +0,0 @@
-"""
-    codegen
-    ~~~~~~~
-
-    Extension to ast that allow ast -> python code generation.
-
-    :copyright: Copyright 2008 by Armin Ronacher.
-    :license: BSD.
-"""
-from ast import *
-
-BINOP_SYMBOLS = {}
-BINOP_SYMBOLS[Add] = '+'
-BINOP_SYMBOLS[Sub] = '-'
-BINOP_SYMBOLS[Mult] = '*'
-BINOP_SYMBOLS[Div] = '/'
-BINOP_SYMBOLS[Mod] = '%'
-BINOP_SYMBOLS[Pow] = '**'
-BINOP_SYMBOLS[LShift] = '<<'
-BINOP_SYMBOLS[RShift] = '>>'
-BINOP_SYMBOLS[BitOr] = '|'
-BINOP_SYMBOLS[BitXor] = '^'
-BINOP_SYMBOLS[BitAnd] = '&'
-BINOP_SYMBOLS[FloorDiv] = '//'
-
-BOOLOP_SYMBOLS = {}
-BOOLOP_SYMBOLS[And] = 'and'
-BOOLOP_SYMBOLS[Or] = 'or'
-
-CMPOP_SYMBOLS = {}
-CMPOP_SYMBOLS[Eq] = '=='
-CMPOP_SYMBOLS[NotEq] = '!='
-CMPOP_SYMBOLS[Lt] = '<'
-CMPOP_SYMBOLS[LtE] = '<='
-CMPOP_SYMBOLS[Gt] = '>'
-CMPOP_SYMBOLS[GtE] = '>='
-CMPOP_SYMBOLS[Is] = 'is'
-CMPOP_SYMBOLS[IsNot] = 'is not'
-CMPOP_SYMBOLS[In] = 'in'
-CMPOP_SYMBOLS[NotIn] = 'not in'
-
-UNARYOP_SYMBOLS = {}
-UNARYOP_SYMBOLS[Invert] = '~'
-UNARYOP_SYMBOLS[Not] = 'not'
-UNARYOP_SYMBOLS[UAdd] = '+'
-UNARYOP_SYMBOLS[USub] = '-'
-
-
-def to_source(node, indent_with=' ' * 4, add_line_information=False):
-    """This function can convert a node tree back into python sourcecode.
-    This is useful for debugging purposes, especially if you're dealing with
-    custom asts not generated by python itself.
-
-    It could be that the sourcecode is evaluable when the AST itself is not
-    compilable / evaluable.  The reason for this is that the AST contains some
-    more data than regular sourcecode does, which is dropped during
-    conversion.
-
-    Each level of indentation is replaced with `indent_with`.  Per default this
-    parameter is equal to four spaces as suggested by PEP 8, but it might be
-    adjusted to match the application's styleguide.
-
-    If `add_line_information` is set to `True` comments for the line numbers
-    of the nodes are added to the output.  This can be used to spot wrong line
-    number information of statement nodes.
-    """
-    generator = SourceGenerator(indent_with, add_line_information)
-    generator.visit(node)
-
-    return ''.join(generator.result)
-
-class SourceGenerator(NodeVisitor):
-    """This visitor is able to transform a well formed syntax tree into python
-    sourcecode.  For more details have a look at the docstring of the
-    `node_to_source` function.
-    """
-
-    def __init__(self, indent_with, add_line_information=False):
-        self.result = []
-        self.indent_with = indent_with
-        self.add_line_information = add_line_information
-        self.indentation = 0
-        self.new_lines = 0
-
-    def write(self, x):
-        if self.new_lines:
-            if self.result:
-                self.result.append('\n' * self.new_lines)
-            self.result.append(self.indent_with * self.indentation)
-            self.new_lines = 0
-        self.result.append(x)
-
-    def newline(self, node=None, extra=0):
-        self.new_lines = max(self.new_lines, 1 + extra)
-        if node is not None and self.add_line_information:
-            self.write('# line: %s' % node.lineno)
-            self.new_lines = 1
-
-    def body(self, statements):
-        self.new_line = True
-        self.indentation += 1
-        for stmt in statements:
-            self.visit(stmt)
-        self.indentation -= 1
-
-    def body_or_else(self, node):
-        self.body(node.body)
-        if node.orelse:
-            self.newline()
-            self.write('else:')
-            self.body(node.orelse)
-
-    def signature(self, node):
-        want_comma = []
-        def write_comma():
-            if want_comma:
-                self.write(', ')
-            else:
-                want_comma.append(True)
-
-        padding = [None] * (len(node.args) - len(node.defaults))
-        for arg, default in zip(node.args, padding + node.defaults):
-            write_comma()
-            self.visit(arg)
-            if default is not None:
-                self.write('=')
-                self.visit(default)
-        if node.vararg is not None:
-            write_comma()
-            self.write('*' + node.vararg)
-        if node.kwarg is not None:
-            write_comma()
-            self.write('**' + node.kwarg)
-
-    def decorators(self, node):
-        for decorator in node.decorator_list:
-            self.newline(decorator)
-            self.write('@')
-            self.visit(decorator)
-
-    # Statements
-
-    def visit_Assert(self, node):
-        self.newline(node)
-        self.write('assert ')
-        self.visit(node.test)
-        if node.msg is not None:
-           self.write(', ')
-           self.visit(node.msg)
-
-    def visit_Assign(self, node):
-        self.newline(node)
-        for idx, target in enumerate(node.targets):
-            if idx:
-                self.write(', ')
-            self.visit(target)
-        self.write(' = ')
-        self.visit(node.value)
-
-    def visit_AugAssign(self, node):
-        self.newline(node)
-        self.visit(node.target)
-        self.write(' ' + BINOP_SYMBOLS[type(node.op)] + '= ')
-        self.visit(node.value)
-
-    def visit_ImportFrom(self, node):
-        self.newline(node)
-        self.write('from %s%s import ' % ('.' * node.level, node.module))
-        for idx, item in enumerate(node.names):
-            if idx:
-                self.write(', ')
-            self.write(item)
-
-    def visit_Import(self, node):
-        self.newline(node)
-        for item in node.names:
-            self.write('import ')
-            self.visit(item)
-
-    def visit_Expr(self, node):
-        self.newline(node)
-        self.generic_visit(node)
-
-    def visit_FunctionDef(self, node):
-        self.newline(extra=1)
-        self.decorators(node)
-        self.newline(node)
-        self.write('def %s(' % node.name)
-        self.visit(node.args)
-        self.write('):')
-        self.body(node.body)
-
-    def visit_ClassDef(self, node):
-        have_args = []
-        def paren_or_comma():
-            if have_args:
-                self.write(', ')
-            else:
-                have_args.append(True)
-                self.write('(')
-
-        self.newline(extra=2)
-        self.decorators(node)
-        self.newline(node)
-        self.write('class %s' % node.name)
-        for base in node.bases:
-            paren_or_comma()
-            self.visit(base)
-        # XXX: the if here is used to keep this module compatible
-        #      with python 2.6.
-        if hasattr(node, 'keywords'):
-            for keyword in node.keywords:
-                paren_or_comma()
-                self.write(keyword.arg + '=')
-                self.visit(keyword.value)
-            if node.starargs is not None:
-                paren_or_comma()
-                self.write('*')
-                self.visit(node.starargs)
-            if node.kwargs is not None:
-                paren_or_comma()
-                self.write('**')
-                self.visit(node.kwargs)
-        self.write(have_args and '):' or ':')
-        self.body(node.body)
-
-    def visit_If(self, node):
-        self.newline(node)
-        self.write('if ')
-        self.visit(node.test)
-        self.write(':')
-        self.body(node.body)
-        while True:
-            else_ = node.orelse
-            if len(else_) == 0:
-                break
-            elif len(else_) == 1 and isinstance(else_[0], If):
-                node = else_[0]
-                self.newline()
-                self.write('elif ')
-                self.visit(node.test)
-                self.write(':')
-                self.body(node.body)
-            else:
-                self.newline()
-                self.write('else:')
-                self.body(else_)
-                break
-
-    def visit_For(self, node):
-        self.newline(node)
-        self.write('for ')
-        self.visit(node.target)
-        self.write(' in ')
-        self.visit(node.iter)
-        self.write(':')
-        self.body_or_else(node)
-
-    def visit_While(self, node):
-        self.newline(node)
-        self.write('while ')
-        self.visit(node.test)
-        self.write(':')
-        self.body_or_else(node)
-
-    def visit_With(self, node):
-        self.newline(node)
-        self.write('with ')
-        self.visit(node.context_expr)
-        if node.optional_vars is not None:
-            self.write(' as ')
-            self.visit(node.optional_vars)
-        self.write(':')
-        self.body(node.body)
-
-    def visit_Pass(self, node):
-        self.newline(node)
-        self.write('pass')
-
-    def visit_Print(self, node):
-        # XXX: python 2.6 only
-        self.newline(node)
-        self.write('print ')
-        want_comma = False
-        if node.dest is not None:
-            self.write(' >> ')
-            self.visit(node.dest)
-            want_comma = True
-        for value in node.values:
-            if want_comma:
-                self.write(', ')
-            self.visit(value)
-            want_comma = True
-        if not node.nl:
-            self.write(',')
-
-    def visit_Delete(self, node):
-        self.newline(node)
-        self.write('del ')
-        for idx, target in enumerate(node):
-            if idx:
-                self.write(', ')
-            self.visit(target)
-
-    def visit_TryExcept(self, node):
-        self.newline(node)
-        self.write('try:')
-        self.body(node.body)
-        for handler in node.handlers:
-            self.visit(handler)
-
-    def visit_TryFinally(self, node):
-        self.newline(node)
-        self.write('try:')
-        self.body(node.body)
-        self.newline(node)
-        self.write('finally:')
-        self.body(node.finalbody)
-
-    def visit_Global(self, node):
-        self.newline(node)
-        self.write('global ' + ', '.join(node.names))
-
-    def visit_Nonlocal(self, node):
-        self.newline(node)
-        self.write('nonlocal ' + ', '.join(node.names))
-
-    def visit_Return(self, node):
-        self.newline(node)
-        if node.value is None:
-            self.write('return')
-        else:
-            self.write('return ')
-            self.visit(node.value)
-
-    def visit_Break(self, node):
-        self.newline(node)
-        self.write('break')
-
-    def visit_Continue(self, node):
-        self.newline(node)
-        self.write('continue')
-
-    def visit_Raise(self, node):
-        # XXX: Python 2.6 / 3.0 compatibility
-        self.newline(node)
-        self.write('raise')
-        if hasattr(node, 'exc') and node.exc is not None:
-            self.write(' ')
-            self.visit(node.exc)
-            if node.cause is not None:
-                self.write(' from ')
-                self.visit(node.cause)
-        elif hasattr(node, 'type') and node.type is not None:
-            self.visit(node.type)
-            if node.inst is not None:
-                self.write(', ')
-                self.visit(node.inst)
-            if node.tback is not None:
-                self.write(', ')
-                self.visit(node.tback)
-
-    # Expressions
-
-    def visit_Attribute(self, node):
-        self.visit(node.value)
-        self.write('.' + node.attr)
-
-    def visit_Call(self, node):
-        want_comma = []
-        def write_comma():
-            if want_comma:
-                self.write(', ')
-            else:
-                want_comma.append(True)
-
-        self.visit(node.func)
-        self.write('(')
-        for arg in node.args:
-            write_comma()
-            self.visit(arg)
-        for keyword in node.keywords:
-            write_comma()
-            self.write(keyword.arg + '=')
-            self.visit(keyword.value)
-        if node.starargs is not None:
-            write_comma()
-            self.write('*')
-            self.visit(node.starargs)
-        if node.kwargs is not None:
-            write_comma()
-            self.write('**')
-            self.visit(node.kwargs)
-        self.write(')')
-
-    def visit_Name(self, node):
-        self.write(node.id)
-
-    def visit_Str(self, node):
-        self.write(repr(node.s))
-
-    def visit_Bytes(self, node):
-        self.write(repr(node.s))
-
-    def visit_Num(self, node):
-        self.write(repr(node.n))
-
-    def visit_Tuple(self, node):
-        self.write('(')
-        idx = -1
-        for idx, item in enumerate(node.elts):
-            if idx:
-                self.write(', ')
-            self.visit(item)
-        self.write(idx and ')' or ',)')
-
-    def sequence_visit(left, right):
-        def visit(self, node):
-            self.write(left)
-            for idx, item in enumerate(node.elts):
-                if idx:
-                    self.write(', ')
-                self.visit(item)
-            self.write(right)
-        return visit
-
-    visit_List = sequence_visit('[', ']')
-    visit_Set = sequence_visit('{', '}')
-    del sequence_visit
-
-    def visit_Dict(self, node):
-        self.write('{')
-        for idx, (key, value) in enumerate(zip(node.keys, node.values)):
-            if idx:
-                self.write(', ')
-            self.visit(key)
-            self.write(': ')
-            self.visit(value)
-        self.write('}')
-
-    def visit_BinOp(self, node):
-        self.visit(node.left)
-        self.write(' %s ' % BINOP_SYMBOLS[type(node.op)])
-        self.visit(node.right)
-
-    def visit_BoolOp(self, node):
-        self.write('(')
-        for idx, value in enumerate(node.values):
-            if idx:
-                self.write(' %s ' % BOOLOP_SYMBOLS[type(node.op)])
-            self.visit(value)
-        self.write(')')
-
-    def visit_Compare(self, node):
-        self.write('(')
-        self.visit(node.left)
-        for op, right in zip(node.ops, node.comparators):
-            self.write(' %s ' % CMPOP_SYMBOLS[type(op)])
-            self.visit(right)
-        self.write(')')
-
-    def visit_UnaryOp(self, node):
-        self.write('(')
-        op = UNARYOP_SYMBOLS[type(node.op)]
-        self.write(op)
-        if op == 'not':
-            self.write(' ')
-        self.visit(node.operand)
-        self.write(')')
-
-    def visit_Subscript(self, node):
-        self.visit(node.value)
-        self.write('[')
-        self.visit(node.slice)
-        self.write(']')
-
-    def visit_Slice(self, node):
-        if node.lower is not None:
-            self.visit(node.lower)
-        self.write(':')
-        if node.upper is not None:
-            self.visit(node.upper)
-        if node.step is not None:
-            self.write(':')
-            if not (isinstance(node.step, Name) and node.step.id == 'None'):
-                self.visit(node.step)
-
-    def visit_ExtSlice(self, node):
-        for idx, item in node.dims:
-            if idx:
-                self.write(', ')
-            self.visit(item)
-
-    def visit_Yield(self, node):
-        self.write('yield ')
-        self.visit(node.value)
-
-    def visit_Lambda(self, node):
-        self.write('lambda ')
-        self.visit(node.args)
-        self.write(': ')
-        self.visit(node.body)
-
-    def visit_Ellipsis(self, node):
-        self.write('Ellipsis')
-
-    def generator_visit(left, right):
-        def visit(self, node):
-            self.write(left)
-            self.visit(node.elt)
-            for comprehension in node.generators:
-                self.visit(comprehension)
-            self.write(right)
-        return visit
-
-    visit_ListComp = generator_visit('[', ']')
-    visit_GeneratorExp = generator_visit('(', ')')
-    visit_SetComp = generator_visit('{', '}')
-    del generator_visit
-
-    def visit_DictComp(self, node):
-        self.write('{')
-        self.visit(node.key)
-        self.write(': ')
-        self.visit(node.value)
-        for comprehension in node.generators:
-            self.visit(comprehension)
-        self.write('}')
-
-    def visit_IfExp(self, node):
-        self.visit(node.body)
-        self.write(' if ')
-        self.visit(node.test)
-        self.write(' else ')
-        self.visit(node.orelse)
-
-    def visit_Starred(self, node):
-        self.write('*')
-        self.visit(node.value)
-
-    def visit_Repr(self, node):
-        # XXX: python 2.6 only
-        self.write('`')
-        self.visit(node.value)
-        self.write('`')
-
-    # Helper Nodes
-
-    def visit_alias(self, node):
-        self.write(node.name)
-        if node.asname is not None:
-            self.write(' as ' + node.asname)
-
-    def visit_comprehension(self, node):
-        self.write(' for ')
-        self.visit(node.target)
-        self.write(' in ')
-        self.visit(node.iter)
-        if node.ifs:
-            for if_ in node.ifs:
-                self.write(' if ')
-                self.visit(if_)
-
-    def visit_excepthandler(self, node):
-        self.newline(node)
-        self.write('except')
-        if node.type is not None:
-            self.write(' ')
-            self.visit(node.type)
-            if node.name is not None:
-                self.write(' as ')
-                self.visit(node.name)
-        self.write(':')
-        self.body(node.body)
-
-    def visit_arguments(self, node):
-        self.signature(node)
diff --git a/executor/src/test/resources/py4j-0.10.4-src.zip b/executor/src/test/resources/py4j-0.10.4-src.zip
deleted file mode 100644
index 8c3829e..0000000
Binary files a/executor/src/test/resources/py4j-0.10.4-src.zip and /dev/null differ
diff --git a/executor/src/test/resources/py4j.tar.gz b/executor/src/test/resources/py4j.tar.gz
deleted file mode 100644
index 761a0af..0000000
Binary files a/executor/src/test/resources/py4j.tar.gz and /dev/null differ
diff --git a/executor/src/test/resources/pyspark-with-amacontext.py b/executor/src/test/resources/pyspark-with-amacontext.py
deleted file mode 100755
index c940eea..0000000
--- a/executor/src/test/resources/pyspark-with-amacontext.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
-    def __init__(self, sc, spark, job_id, env):
-        self.sc = sc
-        self.spark = spark
-        self.job_id = job_id
-        self.env = env
-
-    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
-        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
-    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
-        self.name = name
-        self.master = master
-        self.input_root_path = input_root_path
-        self.output_root_path = output_root_path
-        self.working_dir = working_dir
-        self.configuration = configuration
-
-data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-rdd = ama_context.sc.parallelize(data)
-odd = rdd.filter(lambda num: num % 2 != 0)
\ No newline at end of file
diff --git a/executor/src/test/resources/pyspark.tar.gz b/executor/src/test/resources/pyspark.tar.gz
deleted file mode 100644
index 6f25984..0000000
Binary files a/executor/src/test/resources/pyspark.tar.gz and /dev/null differ
diff --git a/executor/src/test/resources/pyspark.zip b/executor/src/test/resources/pyspark.zip
deleted file mode 100644
index a624c9f..0000000
Binary files a/executor/src/test/resources/pyspark.zip and /dev/null differ
diff --git a/executor/src/test/resources/runtime.py b/executor/src/test/resources/runtime.py
deleted file mode 100644
index d01664c..0000000
--- a/executor/src/test/resources/runtime.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
-    def __init__(self, sc, spark, job_id, env):
-        self.sc = sc
-        self.spark = spark
-        self.job_id = job_id
-        self.env = env
-
-    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
-        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
-    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
-        self.name = name
-        self.master = master
-        self.input_root_path = input_root_path
-        self.output_root_path = output_root_path
-        self.working_dir = working_dir
-        self.configuration = configuration
diff --git a/executor/src/test/resources/simple-pyspark.py b/executor/src/test/resources/simple-pyspark.py
deleted file mode 100755
index 923f81c..0000000
--- a/executor/src/test/resources/simple-pyspark.py
+++ /dev/null
@@ -1,26 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-try:
-    rdd = sc.parallelize(data)
-
-    def g(x):
-        print(x)
-
-    rdd.foreach(g)
-except Exception as e:
-    print type(e), e
\ No newline at end of file
diff --git a/executor/src/test/resources/simple-python-err.py b/executor/src/test/resources/simple-python-err.py
deleted file mode 100755
index dff1491..0000000
--- a/executor/src/test/resources/simple-python-err.py
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5]
-1/0
-
-with open('/tmp/amatest-in.txt', 'a') as the_file:
-    the_file.write('hi there\n') # python will convert \n to os.linesep
diff --git a/executor/src/test/resources/simple-python.py b/executor/src/test/resources/simple-python.py
deleted file mode 100755
index 0ac6f85..0000000
--- a/executor/src/test/resources/simple-python.py
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5]
-print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
-print(data)
-
-with open('/tmp/amatest-in.txt', 'a') as the_file:
-    the_file.write('hi there\n') # python will convert \n to os.linesep
diff --git a/executor/src/test/resources/simple-spark.scala b/executor/src/test/resources/simple-spark.scala
deleted file mode 100755
index 802547c..0000000
--- a/executor/src/test/resources/simple-spark.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.amaterasu.executor.runtime.AmaContext
-import org.apache.spark.sql.{DataFrame, SaveMode}
-
-val data = Array(1, 2, 3, 4, 5)
-
-val sc = AmaContext.sc
-val rdd = sc.parallelize(data)
-val sqlContext = AmaContext.sqlContext
-
-import sqlContext.implicits._
-val x: DataFrame = rdd.toDF()
-
-x.write.mode(SaveMode.Overwrite)
\ No newline at end of file
diff --git a/executor/src/test/resources/spark_intp.py b/executor/src/test/resources/spark_intp.py
deleted file mode 100755
index a427e92..0000000
--- a/executor/src/test/resources/spark_intp.py
+++ /dev/null
@@ -1,109 +0,0 @@
-#!/usr/bin/python
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import ast
-import codegen
-import os
-import sys
-import zipimport
-from runtime import AmaContext, Environment
-
-os.chdir(os.getcwd() + '/build/resources/test/')
-import zipfile
-zip = zipfile.ZipFile('pyspark.zip')
-zip.extractall()
-zip = zipfile.ZipFile('py4j-0.10.4-src.zip', 'r')
-zip.extractall()
-sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
-sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
-
-# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
-# py4j_importer = zipimport.zipimporter(py4j_path)
-# py4j = py4j_importer.load_module('py4j')
-from py4j.java_gateway import JavaGateway, GatewayClient, java_import
-from py4j.protocol import Py4JJavaError
-from pyspark.conf import SparkConf
-from pyspark.context import SparkContext
-from pyspark.rdd import RDD
-from pyspark.files import SparkFiles
-from pyspark.storagelevel import StorageLevel
-from pyspark import accumulators
-from pyspark.accumulators import Accumulator, AccumulatorParam
-from pyspark.broadcast import Broadcast
-from pyspark.serializers import MarshalSerializer, PickleSerializer
-from pyspark.sql import SparkSession
-from pyspark.sql import Row
-
-client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client, auto_convert=True)
-entry_point = gateway.entry_point
-queue = entry_point.getExecutionQueue()
-
-java_import(gateway.jvm, "org.apache.spark.SparkEnv")
-java_import(gateway.jvm, "org.apache.spark.SparkConf")
-java_import(gateway.jvm, "org.apache.spark.api.java.*")
-java_import(gateway.jvm, "org.apache.spark.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.sql.*")
-java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-java_import(gateway.jvm, "scala.Tuple2")
-
-jconf = entry_point.getSparkConf()
-jsc = entry_point.getJavaSparkContext()
-
-job_id = entry_point.getJobId()
-javaEnv = entry_point.getEnv()
-
-env = Environment(javaEnv.name(), javaEnv.master(), javaEnv.inputRootPath(), javaEnv.outputRootPath(), javaEnv.workingDir(), javaEnv.configuration())
-conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
-conf.setExecutorEnv('PYTHONPATH', ':'.join(sys.path))
-sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-spark = SparkSession(sc, entry_point.getSparkSession())
-
-ama_context = AmaContext(sc, spark, job_id, env)
-
-while True:
-    actionData = queue.getNext()
-    resultQueue = entry_point.getResultQueue(actionData._2())
-    actionSource = actionData._1()
-    tree = ast.parse(actionSource)
-    exports = actionData._3()
-
-    for node in tree.body:
-
-        wrapper = ast.Module(body=[node])
-        try:
-            co = compile(wrapper, "<ast>", 'exec')
-            exec (co)
-            resultQueue.put('success', actionData._2(), codegen.to_source(node), '')
-
-            #if this node is an assignment, we need to check if it needs to be persisted
-            try:
-                persistCode = ''
-                if(isinstance(node,ast.Assign)):
-                    varName = node.targets[0].id
-                    if(exports.containsKey(varName)):
-                        persistCode = varName + ".write.save(\"" + env.working_dir + "/" + job_id + "/" + actionData._2() + "/" + varName + "\", format=\"" + exports[varName] + "\", mode='overwrite')"
-                        persist = compile(persistCode, '<stdin>', 'exec')
-                        exec(persist)
-
-            except:
-                resultQueue.put('error', actionData._2(), persistCode, str(sys.exc_info()[1]))
-        except:
-            resultQueue.put('error', actionData._2(), codegen.to_source(node), str(sys.exc_info()[1]))
-    resultQueue.put('completion', '', '', '')
\ No newline at end of file
diff --git a/executor/src/test/resources/step-2.scala b/executor/src/test/resources/step-2.scala
deleted file mode 100755
index 4b0dfca..0000000
--- a/executor/src/test/resources/step-2.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.amaterasu.executor.runtime.AmaContext
-
-val oddRdd = AmaContext.getRDD[Int]("start", "rdd").filter(x=>x/2 == 0)
-oddRdd.take(5).foreach(println)
-
-val highNoDf = AmaContext.getDataFrame("start", "x").where("_1 > 3")
-highNoDf.show
diff --git a/executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala b/executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala
deleted file mode 100644
index 2decb9c..0000000
--- a/executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.RunnersTests
-
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.scalatest._
-
-@DoNotDiscover
-class RunnersLoadingTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  var env: Environment = _
-  var factory: ProvidersFactory = _
-
-  "RunnersFactory" should "be loaded with all the implementations of AmaterasuRunner in its classpath" in {
-    val r = factory.getRunner("spark", "scala")
-    r should not be null
-  }
-}
\ No newline at end of file
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala
deleted file mode 100755
index f12d676..0000000
--- a/executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.spark
-
-import java.io.File
-
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.PySparkRunner
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-import scala.io.Source
-
-@DoNotDiscover
-class PySparkRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  Logger.getLogger("spark").setLevel(Level.OFF)
-  Logger.getLogger("jetty").setLevel(Level.OFF)
-  Logger.getRootLogger.setLevel(Level.OFF)
-
-  var factory: ProvidersFactory = _
-
-  def delete(file: File) {
-    if (file.isDirectory)
-      Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
-    file.delete
-  }
-
-  override protected def afterAll(): Unit = {
-    val pysparkDir = new File(getClass.getResource("/pyspark").getPath)
-    val py4jDir = new File(getClass.getResource("/py4j").getPath)
-    delete(pysparkDir)
-    delete(py4jDir)
-    super.afterAll()
-  }
-
-
-  "PySparkRunner.executeSource" should "execute simple python code" in {
-    val src = Source.fromFile(getClass.getResource("/simple-python.py").getPath).mkString
-    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-    println("3333333333333333333333")
-    runner.executeSource(src, "test_action1", Map.empty[String, String].asJava)
-  }
-
-  it should "print and trows an errors" in {
-    a[java.lang.Exception] should be thrownBy {
-      val src = Source.fromFile(getClass.getResource("/simple-python-err.py").getPath).mkString
-      var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-      runner.executeSource(src, "test_action2", Map.empty[String, String].asJava)
-    }
-  }
-
-  it should "also execute spark code written in python" in {
-    val src = Source.fromFile(getClass.getResource("/simple-pyspark.py").getPath).mkString
-    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-    runner.executeSource(src, "test_action3", Map("numDS" -> "parquet").asJava)
-  }
-
-  it should "also execute spark code written in python with AmaContext being used" in {
-    val src = Source.fromFile(getClass.getResource("/pyspark-with-amacontext.py").getPath).mkString
-    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-    runner.executeSource(src, "test_action4", Map.empty[String, String].asJava)
-  }
-
-}
\ No newline at end of file
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
deleted file mode 100755
index cf7aa0d..0000000
--- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//package org.apache.amaterasu.spark
-//
-//import java.io.File
-//
-//import org.apache.amaterasu.common.runtime._
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.utilities.TestNotifier
-//
-//import scala.collection.JavaConverters._
-//import org.apache.commons.io.FileUtils
-//import java.io.ByteArrayOutputStream
-//
-//import org.apache.spark.SparkConf
-//import org.apache.spark.repl.Main
-//import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner}
-//import org.apache.spark.sql.SparkSession
-//import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-//
-//class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-//
-//  var runner: SparkScalaRunner = _
-//
-//  override protected def beforeAll(): Unit = {
-//
-//    FileUtils.deleteQuietly(new File("/tmp/job_5/"))
-//
-//    val env = Environment()
-//    env.workingDir = "file:///tmp"
-//    env.master = "local[*]"
-//
-//
-//    val spark = SparkRunnerHelper.createSpark(env, "job_5", Seq.empty[String], Map.empty)
-//
-//
-//    val notifier = new TestNotifier()
-//    val strm = new ByteArrayOutputStream()
-//    runner = SparkScalaRunner(env, "job_5", spark, strm, notifier, Seq.empty[String])
-//    super.beforeAll()
-//  }
-//
-//  "SparkScalaRunner" should "execute the simple-spark.scala" in {
-//
-//    val script = getClass.getResource("/simple-spark.scala").getPath
-//    runner.executeSource(script, "start", Map.empty[String, String].asJava)
-//
-//  }
-//
-//  "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
-//
-//    val script = getClass.getResource("/step-2.scala").getPath
-//    runner.executeSource(script, "cont", Map.empty[String, String].asJava)
-//
-//  }
-//}
\ No newline at end of file
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
deleted file mode 100644
index 90cf73b..0000000
--- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.spark
-
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner
-import org.apache.amaterasu.utilities.TestNotifier
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-import org.apache.spark.sql.{SaveMode, SparkSession}
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-
-/**
-  * Created by kirupa on 10/12/16.
-  */
-@DoNotDiscover
-class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  Logger.getLogger("spark").setLevel(Level.OFF)
-  Logger.getLogger("jetty").setLevel(Level.OFF)
-  Logger.getRootLogger.setLevel(Level.OFF)
-
-
-  val notifier = new TestNotifier()
-
-  var factory: ProvidersFactory = _
-  var env: Environment = _
-
-  /*
-  Test whether parquet is used as default file format to load data from previous actions
-   */
-
-  "SparkSql" should "load data as parquet if no input foramt is specified" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
-
-    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/${sparkSql.jobId}/sparksqldefaultparquetjobaction/sparksqldefaultparquetjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqldefaultparquetjobaction_sparksqldefaultparquetjobactiontempdf where age=22", "sql_parquet_test", Map("result" -> "parquet").asJava)
-
-    val outputDf = spark.read.parquet(s"${env.workingDir}/${sparkSql.jobId}/sql_parquet_test/result")
-    println("Output Default Parquet: " + inputDf.count + "," + outputDf.first().getString(1))
-    outputDf.first().getString(1) shouldEqual "Michael"
-  }
-
-  /*
-  Test whether the parquet data is successfully parsed, loaded and processed by SparkSQL
-   */
-
-  "SparkSql" should "load PARQUET data directly from previous action's dataframe and persist the Data in working directory" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
-    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/${sparkSql.jobId}/sparksqlparquetjobaction/sparksqlparquetjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlparquetjobaction_sparksqlparquetjobactiontempdf READAS parquet", "sql_parquet_test", Map("result2" -> "parquet").asJava)
-
-    val outputDf = spark.read.parquet(s"${env.workingDir}/${sparkSql.jobId}/sql_parquet_test/result2")
-    println("Output Parquet: " + inputDf.count + "," + outputDf.count)
-    inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
-  }
-
-
-  /*
-  Test whether the JSON data is successfully parsed, loaded by SparkSQL
-  */
-
-  "SparkSql" should "load JSON data directly from previous action's dataframe and persist the Data in working directory" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.json(getClass.getResource("/SparkSql/json").getPath)
-
-    inputDf.write.mode(SaveMode.Overwrite).json(s"${env.workingDir}/${sparkSql.jobId}/sparksqljsonjobaction/sparksqljsonjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqljsonjobaction_sparksqljsonjobactiontempdf  where age='30' READAS json", "sql_json_test", Map("result" -> "json").asJava)
-
-    val outputDf = spark.read.json(s"${env.workingDir}/${sparkSql.jobId}/sql_json_test/result")
-    println("Output JSON: " + inputDf.count + "," + outputDf.count)
-    outputDf.first().getString(1) shouldEqual "Kirupa"
-
-  }
-
-  /*
-  Test whether the CSV data is successfully parsed, loaded by SparkSQL
-  */
-
-  "SparkSql" should "load CSV data directly from previous action's dataframe and persist the Data in working directory" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath)
-    inputDf.write.mode(SaveMode.Overwrite).csv(s"${env.workingDir}/${sparkSql.jobId}/sparksqlcsvjobaction/sparksqlcsvjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlcsvjobaction_sparksqlcsvjobactiontempdf READAS csv", "sql_csv_test", Map("result" -> "csv").asJava)
-
-
-    val outputDf = spark.read.csv(s"${env.workingDir}/${sparkSql.jobId}/sql_csv_test/result")
-    println("Output CSV: " + inputDf.count + "," + outputDf.count)
-    inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
-  }
-
-  /*
-  Test whether the data can be directly read from a file and executed by sparkSql
-  */
-//  "SparkSql" should "load data directly from a file and persist the Data in working directory" in {
-//
-//    val tempFileEnv = Environment()
-//    tempFileEnv.workingDir = "file:/tmp/"
-//    AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv)
-//
-//    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", notifier, spark)
-//    sparkSql.executeSource("SELECT * FROM parquet.`" + getClass.getResource("/SparkSql/parquet").getPath + "`", "sql_parquet_file_test", Map("result" -> "parquet").asJava)
-//    val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
-//    println("Output Parquet dataframe: " + outputParquetDf.show)
-//    outputParquetDf.first().getString(1) shouldEqual "Michael"
-//    sparkSql.executeSource("SELECT * FROM json.`" + getClass.getResource("/SparkSql/json").getPath + "`","sql_parquet_file_test", Map("result" -> "json").asJava)
-//
-//    val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
-//    println("Output Json dataframe: " + outputJsonDf.show)
-//    outputJsonDf.first().getString(1) shouldEqual "Sampath"
-//
-//  }
-
-
-}
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
deleted file mode 100644
index 8a1e549..0000000
--- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.spark
-
-import java.io.{ByteArrayOutputStream, File}
-
-import org.apache.amaterasu.RunnersTests.RunnersLoadingTests
-import org.apache.amaterasu.common.dataobjects.ExecData
-import org.apache.amaterasu.common.execution.dependencies._
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.utilities.TestNotifier
-import org.apache.spark.repl.amaterasu.runners.spark.SparkScalaRunner
-import org.apache.spark.sql.SparkSession
-import org.scalatest._
-
-
-
-import scala.collection.mutable.ListBuffer
-
-
-class SparkTestsSuite extends Suites(
-  new PySparkRunnerTests,
-  new RunnersLoadingTests,
-  new SparkSqlRunnerTests) with BeforeAndAfterAll {
-
-  var env: Environment = _
-  var factory: ProvidersFactory = _
-  var spark: SparkSession = _
-
-  private def createTestMiniconda(): Unit = {
-    println(s"PATH: ${new File(".").getAbsolutePath}")
-    new File("miniconda/pkgs").mkdirs()
-  }
-
-  override def beforeAll(): Unit = {
-
-    // I can't apologise enough for this
-    val resources = new File(getClass.getResource("/spark_intp.py").getPath).getParent
-    val workDir = new File(resources).getParentFile.getParent
-
-    env = Environment()
-    env.workingDir = s"file://$workDir"
-
-    env.master = "local[1]"
-    if (env.configuration != null) env.configuration ++ "pysparkPath" -> "/usr/bin/python" else env.configuration = Map(
-      "pysparkPath" -> "/usr/bin/python",
-      "cwd" -> resources
-    )
-
-    val excEnv = Map[String, Any](
-      "PYTHONPATH" -> resources
-    )
-    createTestMiniconda()
-    env.configuration ++ "spark_exec_env" -> excEnv
-    factory = ProvidersFactory(ExecData(env,
-      Dependencies(ListBuffer.empty[Repo], List.empty[Artifact]),
-      PythonDependencies(List.empty[PythonPackage]),
-      Map(
-        "spark" -> Map.empty[String, Any],
-        "spark_exec_env" -> Map("PYTHONPATH" -> resources))),
-      "test",
-      new ByteArrayOutputStream(),
-      new TestNotifier(),
-      "test",
-      "localhost",
-      getClass.getClassLoader.getResource("amaterasu.properties").getPath)
-    spark = factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner].spark
-
-    this.nestedSuites.filter(s => s.isInstanceOf[RunnersLoadingTests]).foreach(s => s.asInstanceOf[RunnersLoadingTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[PySparkRunnerTests]).foreach(s => s.asInstanceOf[PySparkRunnerTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].env = env)
-
-
-    super.beforeAll()
-  }
-
-  override def afterAll(): Unit = {
-    new File("miniconda").delete()
-    spark.stop()
-
-    super.afterAll()
-  }
-
-}
diff --git a/executor/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala b/executor/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
deleted file mode 100644
index 16cb97b..0000000
--- a/executor/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.utilities
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-
-
-class TestNotifier extends Notifier with Logging {
-
-  override def info(msg: String): Unit = {
-    log.info(msg)
-  }
-
-  override def success(line: String): Unit = {
-    log.info(s"successfully executed line: $line")
-  }
-
-  override def error(line: String, msg: String): Unit = {
-    log.error(s"Error executing line: $line message: $msg")
-  }
-}
diff --git a/runners/spark-runner/build.gradle b/runners/spark-runner/build.gradle
new file mode 100644
index 0000000..588bd29
--- /dev/null
+++ b/runners/spark-runner/build.gradle
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'com.github.johnrengelman.shadow' version '1.2.4'
+    id 'com.github.maiflai.scalatest' version '0.6-5-g9065d91'
+    id 'scala'
+    id 'java'
+}
+
+shadowJar {
+    zip64 true
+}
+
+//sourceCompatibility = 1.8
+//targetCompatibility = 1.8
+
+repositories {
+    maven {
+        url "https://plugins.gradle.org/m2/"
+    }
+    mavenCentral()
+}
+
+test {
+    maxParallelForks = 1
+    forkEvery = 1
+}
+
+configurations {
+    provided
+    runtime.exclude module: 'hadoop-common'
+    runtime.exclude module: 'hadoop-yarn-api'
+    runtime.exclude module: 'hadoop-yarn-client'
+    runtime.exclude module: 'hadoop-hdfs'
+    runtime.exclude module: 'mesos'
+    runtime.exclude module: 'scala-compiler'
+}
+
+sourceSets {
+    main.compileClasspath += configurations.provided
+    test.compileClasspath += configurations.provided
+    test.runtimeClasspath += configurations.provided
+}
+
+dependencies {
+
+    compile project(':executor')
+    //runtime dependency for spark
+    provided('org.apache.spark:spark-repl_2.11:2.2.1')
+    provided('org.apache.spark:spark-core_2.11:2.2.1')
+
+    testCompile project(':common')
+    testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
+    testRuntime 'org.pegdown:pegdown:1.1.0'
+    testCompile 'junit:junit:4.11'
+    testCompile 'org.scalatest:scalatest_2.11:3.0.2'
+    testCompile('org.apache.spark:spark-repl_2.11:2.2.1')
+    testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
+
+}
+
+sourceSets {
+    test {
+        resources.srcDirs += [file('src/test/resources')]
+    }
+
+    // this is done so Scala will compile before Java
+    main {
+        scala {
+            srcDirs = ['src/main/scala', 'src/main/java']
+        }
+        java {
+            srcDirs = []
+        }
+    }
+}
+
+test {
+
+    maxParallelForks = 1
+}
+
+task copyToHome(type: Copy) {
+    dependsOn shadowJar
+    from 'build/libs'
+    into '../../build/amaterasu/dist'
+    from 'build/resources/main'
+    into '../../build/amaterasu/dist'
+}
diff --git a/runners/spark-runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/AmaContext.scala b/runners/spark-runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/AmaContext.scala
new file mode 100644
index 0000000..68c8116
--- /dev/null
+++ b/runners/spark-runner/src/main/scala/org/apache/amaterasu/executor/runner/spark/AmaContext.scala
@@ -0,0 +1,37 @@
+package org.apache.amaterasu.executor.runner.spark
+
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SparkSession}
+
+/**
+  * @author Arun Manivannan
+  */
+object AmaContext extends Logging {
+
+  var spark: SparkSession = _
+  var sc: SparkContext = _
+  var jobId: String = _
+  var env: Environment = _
+
+  def init(spark: SparkSession,
+           jobId: String,
+           env: Environment): Unit = {
+
+    AmaContext.spark = spark
+    AmaContext.sc = spark.sparkContext
+    AmaContext.jobId = jobId
+    AmaContext.env = env
+
+  }
+
+  def getDataFrame(actionName: String, dfName: String, format: String = "parquet"): DataFrame = {
+    spark.read.format(format).load(s"${env.workingDir}/$jobId/$actionName/$dfName")
+  }
+
+  def getDataset[T: Encoder](actionName: String, dfName: String, format: String = "parquet"): Dataset[T] = {
+    getDataFrame(actionName, dfName, format).as[T]
+  }
+
+}
diff --git a/settings.gradle b/settings.gradle
index 1056e01..c222795 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,8 +15,20 @@
  * limitations under the License.
  */
 include 'leader'
-include 'executor'
+project(':leader')
+
 include 'common'
+project(':common')
+
+include 'executor'
+project(':executor')
+
 include 'sdk'
 findProject(':sdk')?.name = 'amaterasu-sdk'
 
+//Spark
+include 'spark-runner'
+project(':spark-runner').projectDir=file("frameworks/spark/runner")
+include 'spark-runtime'
+project(':spark-runtime').projectDir=file("frameworks/spark/runtime")
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Refactor Spark out of Amaterasu executor to it's own project
> ------------------------------------------------------------
>
>                 Key: AMATERASU-24
>                 URL: https://issues.apache.org/jira/browse/AMATERASU-24
>             Project: AMATERASU
>          Issue Type: Improvement
>            Reporter: Yaniv Rodenski
>            Assignee: Arun Manivannan
>            Priority: Major
>             Fix For: 0.2.1-incubating
>
>
> The Spark framework is a part of the Amaterasu executor and leader, it needs to be under it own project under a new frameworks folder



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)