You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@amaterasu.apache.org by GitBox <gi...@apache.org> on 2018/11/25 13:33:44 UTC

[GitHub] roadan closed pull request #37: feature/amaterasu-51: Add support for configuring the path for action level environment path

roadan closed pull request #37: feature/amaterasu-51: Add support for configuring the path for action level environment path
URL: https://github.com/apache/incubator-amaterasu/pull/37
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/org/apache/amaterasu/common/logging/Logging.java b/common/src/main/java/org/apache/amaterasu/common/logging/Logging.java
index 3f3413fa..7a11ddbe 100644
--- a/common/src/main/java/org/apache/amaterasu/common/logging/Logging.java
+++ b/common/src/main/java/org/apache/amaterasu/common/logging/Logging.java
@@ -1,10 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.common.logging;
 
 import org.slf4j.Logger;
 
-/**
- * Created by Eran Bartenstein (p765790) on 5/11/18.
- */
 public abstract class Logging extends KLogging {
     protected Logger log = getLog();
 }
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt
index 24853c53..bb31677d 100644
--- a/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt
@@ -1,8 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.common.configuration.enums
 
-/**
- * Created by Eran Bartenstein on 21/10/18.
- */
 enum class ActionStatus (val value: String) {
     pending("pending"),
     queued("queued"),
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt
index 7e19db2c..3b0720f1 100644
--- a/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt
@@ -26,11 +26,14 @@ import org.apache.amaterasu.common.configuration.enums.ActionStatus
 data class ActionData(var status: ActionStatus = ActionStatus.pending,
                       var name: String= "",
                       var src: String= "",
+                      var config: String= "",
                       var groupId: String= "",
                       var typeId: String= "",
                       var id: String= "",
                       var exports: Map<String, String> = mutableMapOf(),
-                      var nextActionIds: List<String> = listOf()) {
+                      var nextActionIds: MutableList<String> = mutableListOf()) {
     lateinit var errorActionId: String
+    val hasErrorAction: Boolean
+        get() = ::errorActionId.isInitialized
 
 }
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/logging/KLogging.kt b/common/src/main/kotlin/org/apache/amaterasu/common/logging/KLogging.kt
index 2b4e4112..cba04ec1 100644
--- a/common/src/main/kotlin/org/apache/amaterasu/common/logging/KLogging.kt
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/logging/KLogging.kt
@@ -1,10 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.common.logging
 
 import org.slf4j.LoggerFactory
 
-/**
- * Created by Eran Bartenstein on 5/11/18.
- */
 abstract class KLogging {
     protected var log = LoggerFactory.getLogger(this.javaClass.name)
 }
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 6ca9513b..f97e339d 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -66,14 +66,18 @@ dependencies {
     compile 'org.scala-lang:scala-library:2.11.8'
 
     compile project(':common')
+    compile project(':amaterasu-sdk')
 
     compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
     compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
-
+    compile group: 'org.reflections', name: 'reflections', version: '0.9.11'
     compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r'
+    compile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.15.3'
+    runtime group: 'org.apache.activemq', name: 'activemq-kahadb-store', version: '5.15.3'
+    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
 
     compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
     compile "org.jetbrains.kotlin:kotlin-reflect"
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt
new file mode 100644
index 00000000..390058b2
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node.ArrayNode
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.execution.JobManager
+import org.apache.amaterasu.leader.common.execution.actions.Action
+import org.apache.amaterasu.leader.common.execution.actions.ErrorAction
+import org.apache.amaterasu.leader.common.execution.actions.SequentialAction
+import org.apache.curator.framework.CuratorFramework
+import java.io.File
+import java.util.concurrent.BlockingQueue
+
+object JobParser {
+
+    @JvmStatic
+    fun loadMakiFile(): String = File("repo/maki.yml").readText(Charsets.UTF_8)
+
+    /**
+     * Parses the maki.yml string and creates a job manager
+     *
+     * @param jobId
+     * @param maki a string containing the YAML definition of the job
+     * @param actionsQueue
+     * @param client
+     * @return
+     */
+    @JvmStatic
+    fun parse(jobId: String,
+              maki: String,
+              actionsQueue: BlockingQueue<ActionData>,
+              client: CuratorFramework,
+              attempts: Int): JobManager {
+
+        val mapper = ObjectMapper(YAMLFactory())
+
+        val job = mapper.readTree(maki)
+
+        // loading the job details
+        val manager = JobManager(job.path("job-name").asText(), jobId, actionsQueue, client)
+
+        // iterating the flow list and constructing the job's flow
+        val actions = (job.path("flow") as ArrayNode).toList()
+
+        parseActions(actions, manager, actionsQueue, attempts, null)
+
+        return manager
+    }
+
+    @JvmStatic
+    fun parseActions(actions: List<JsonNode>,
+                     manager: JobManager,
+                     actionsQueue: BlockingQueue<ActionData>,
+                     attempts: Int,
+                     previous: Action?) {
+
+
+        if (actions.isEmpty())
+            return
+
+        val actionData = actions.first()
+
+        val action = parseSequentialAction(
+                actionData,
+                manager.jobId,
+                actionsQueue,
+                manager.client,
+                attempts
+        )
+
+        //updating the list of frameworks setup
+        manager.frameworks.getOrPut(action.data.groupId) { HashSet() }
+                .add(action.data.typeId)
+
+
+        if (!manager.isInitialized) {
+            manager.head = action
+        }
+
+        previous?.let {
+            previous.data.nextActionIds.add(action.actionId)
+        }
+        manager.registerAction(action)
+
+        val errorNode = actionData.path("error")
+
+        if (!errorNode.isMissingNode) {
+
+            val errorAction = parseErrorAction(
+                    errorNode,
+                    manager.jobId,
+                    action.data.id,
+                    actionsQueue,
+                    manager.client
+            )
+
+            action.data.errorActionId = errorAction.data.id
+            manager.registerAction(errorAction)
+
+            //updating the list of frameworks setup
+            manager.frameworks.getOrPut(errorAction.data.groupId) { HashSet() }
+                    .add(errorAction.data.typeId)
+        }
+
+        parseActions(actions.drop(1), manager, actionsQueue, attempts, action)
+
+    }
+
+    @JvmStatic
+    fun parseSequentialAction(action: JsonNode,
+                              jobId: String,
+                              actionsQueue: BlockingQueue<ActionData>,
+                              client: CuratorFramework,
+                              attempts: Int): SequentialAction {
+
+        return SequentialAction(action.path("name").asText(),
+                action.path("file").asText(),
+                action.path("config").asText(),
+                action.path("runner").path("group").asText(),
+                action.path("runner").path("type").asText(),
+                action.path("exports").fields().asSequence().map { it.key to it.value.asText() }.toMap(),
+                jobId,
+                actionsQueue,
+                client,
+                attempts)
+
+    }
+
+    @JvmStatic
+    fun parseErrorAction(action: JsonNode,
+                         jobId: String,
+                         parent: String,
+                         actionsQueue: BlockingQueue<ActionData>,
+                         client: CuratorFramework): ErrorAction {
+
+        return ErrorAction(
+                action.path("name").asText(),
+                action.path("file").asText(),
+                parent,
+                action.path("config").asText(),
+                action.path("runner").path("group").asText(),
+                action.path("runner").path("type").asText(),
+
+                jobId,
+                actionsQueue,
+                client
+        )
+
+    }
+
+}
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
new file mode 100644
index 00000000..b2fe41a7
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.leader.common.execution.actions.Action
+import org.apache.curator.framework.CuratorFramework
+import java.util.concurrent.BlockingQueue
+
+data class JobManager(var name: String = "",
+                      var jobId: String = "",
+                      var executionQueue: BlockingQueue<ActionData>,
+                      var client: CuratorFramework) : KLogging() {
+
+
+    lateinit var head: Action
+
+    // TODO: this is not private due to tests, fix this!!!
+    val registeredActions = HashMap<String, Action>()
+    val frameworks = HashMap<String, HashSet<String>>()
+
+    /**
+     * The start method initiates the job execution by executing the first action.
+     * start mast be called once and by the JobManager only
+     */
+    fun start(): Unit = head.execute()
+
+    val outOfActions: Boolean = registeredActions.filterValues { action ->
+        action.data.status == ActionStatus.pending ||
+                action.data.status == ActionStatus.queued ||
+                action.data.status == ActionStatus.started
+    }.isEmpty()
+
+    /**
+     * getNextActionData returns the data of the next action to be executed if such action
+     * exists
+     *
+     * @return the ActionData of the next action, returns null if no such action exists
+     */
+    fun getNextActionData(): ActionData? {
+
+        val nextAction: ActionData? = executionQueue.poll()
+
+        if (nextAction != null) {
+            registeredActions[nextAction.id]!!.announceStart()
+        }
+
+        return nextAction
+    }
+
+    fun reQueueAction(actionId: String) {
+
+        val action = registeredActions[actionId]
+        executionQueue.put(action!!.data)
+        registeredActions[actionId]!!.announceQueued()
+
+    }
+
+    /**
+     * Registers an action with the job
+     *
+     * @param action
+     */
+    fun registerAction(action: Action) {
+        registeredActions[action.actionId] = action
+    }
+
+    /**
+     * announce the completion of an action and executes the next actions
+     *
+     * @param actionId
+     */
+    fun actionComplete(actionId: String) {
+        val action = registeredActions[actionId]
+        action?.let {
+
+            it.announceComplete()
+
+            action.data.nextActionIds.forEach { id -> registeredActions[id]!!.execute() }
+
+            // we don't need the error action anymore
+            if (it.data.hasErrorAction)
+                registeredActions[action.data.errorActionId]!!.announceCanceled()
+        }
+
+    }
+
+    /**
+     * gets the next action id which can be either the same action or an error action
+     * and if it exist (we have an error action or a retry)
+     *
+     * @param actionId
+     */
+    fun actionFailed(actionId: String, message: String) {
+
+        log.warn(message)
+
+        val action = registeredActions[actionId]
+        val id = action!!.handleFailure(message)
+        if (!id.isEmpty())
+            registeredActions[id]?.execute()
+
+        //delete all future actions
+        cancelFutureActions(action)
+    }
+
+    fun cancelFutureActions(action: Action) {
+
+        if (action.data.status != ActionStatus.failed)
+            action.announceCanceled()
+
+        action.data.nextActionIds.forEach { id ->
+            val registeredAction = registeredActions[id]
+            if (registeredAction != null) {
+                cancelFutureActions(registeredAction)
+            }
+        }
+    }
+
+    /**
+     * announce the start of execution of the action
+     */
+    fun actionStarted(actionId: String) {
+
+        val action = registeredActions[actionId]
+        action?.announceStart()
+
+    }
+
+    fun actionsCount(): Int = executionQueue.size
+
+    val isInitialized: Boolean
+        get() = ::head.isInitialized
+}
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt
index e4f0e3ec..58100658 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt
@@ -19,7 +19,6 @@ package org.apache.amaterasu.leader.common.execution.actions
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.logging.KLogging
-import org.apache.amaterasu.common.logging.Logging
 import org.apache.curator.framework.CuratorFramework
 
 /**
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/ErrorAction.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/ErrorAction.kt
new file mode 100644
index 00000000..a9e6de6d
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/ErrorAction.kt
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution.actions
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import java.util.concurrent.BlockingQueue
+
+class ErrorAction(name: String,
+                  src: String,
+                  parent: String,
+                  config: String,
+                  groupId: String,
+                  typeId: String,
+                  jobId: String,
+                  queue: BlockingQueue<ActionData>,
+                  zkClient: CuratorFramework) : SequentialActionBase() {
+
+    init {
+        jobsQueue = queue
+
+        // creating a znode for the action
+        client = zkClient
+        actionPath = client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId/task-$parent-error", ActionStatus.pending.toString().toByteArray())
+        actionId = actionPath.substring(actionPath.indexOf('-') + 1).replace("/", "-")
+
+        this.jobId = jobId
+        data = ActionData(ActionStatus.pending, name, src, config, groupId, typeId, actionId)
+        jobsQueue = queue
+        client = zkClient
+    }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialAction.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialAction.kt
new file mode 100644
index 00000000..ea864f16
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialAction.kt
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution.actions
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import java.util.concurrent.BlockingQueue
+
+class SequentialAction(name: String,
+                       src: String,
+                       config: String,
+                       groupId: String,
+                       typeId: String,
+                       exports: Map<String, String>,
+                       jobId: String,
+                       queue: BlockingQueue<ActionData>,
+                       zkClient: CuratorFramework,
+                       attempts: Int): SequentialActionBase() {
+    init {
+        this.jobsQueue = queue
+
+        // creating a znode for the action
+        client = zkClient
+        actionPath = client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/$jobId/task-", ActionStatus.pending.toString().toByteArray())
+        actionId = actionPath.substring(actionPath.indexOf("task-") + 5)
+
+        this.attempts = attempts
+        this.jobId = jobId
+        val javaExports = exports
+        data = ActionData(ActionStatus.pending, name, src, config, groupId, typeId, actionId, javaExports, arrayListOf())
+        jobsQueue = queue
+        client = zkClient
+
+    }
+}
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialActionBase.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialActionBase.kt
new file mode 100644
index 00000000..49dd6f7e
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialActionBase.kt
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution.actions
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import java.util.concurrent.BlockingQueue
+
+open class SequentialActionBase : Action() {
+
+
+    var jobId: String = ""
+    lateinit var jobsQueue: BlockingQueue<ActionData>
+    var attempts: Int = 2
+    private var attempt: Int = 1
+
+    override fun execute() {
+
+        try {
+
+            announceQueued()
+            jobsQueue.add(data)
+
+        }
+        catch(e: Exception) {
+
+            //TODO: this will not invoke the error action
+            e.message?.let{ handleFailure(it) }
+
+        }
+
+    }
+
+    override fun handleFailure(message: String): String {
+
+        println("Part ${data.name} of group ${data.groupId} and of type ${data.typeId} failed on attempt $attempt with message: $message")
+        attempt += 1
+
+        var result: String
+        if (attempt <= attempts) {
+            result = data.id
+        }
+        else {
+            announceFailure()
+            println("===> moving to err action ${data.errorActionId}")
+            data.status = ActionStatus.failed
+            result = data.errorActionId
+        }
+        return result
+    }
+
+}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala
similarity index 96%
rename from leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
rename to leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala
index 76302211..3f599233 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
+++ b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.execution.frameworks
+package org.apache.amaterasu.leader.common.execution.frameworks
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.logging.Logging
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.scala
similarity index 97%
rename from leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala
rename to leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.scala
index b3ffaadd..77b3c05a 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala
+++ b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.utilities
+package org.apache.amaterasu.leader.common.utilities
 
 import javax.jms.{Message, MessageListener, TextMessage}
 import net.liftweb.json._
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
index e4179055..236f3eec 100755
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
+++ b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 package org.apache.amaterasu.leader.common.utilities
-import scala.collection.JavaConverters._
+
 import java.io.{File, FileInputStream}
 import java.nio.file.{Files, Paths}
 
@@ -42,6 +42,10 @@ object DataLoader extends Logging {
   val ymlMapper = new ObjectMapper(new YAMLFactory())
   ymlMapper.registerModule(DefaultScalaModule)
 
+  def getTaskDataBytes(actionData: ActionData, env: String): Array[Byte] = {
+    mapper.writeValueAsBytes(getTaskData(actionData, env))
+  }
+
   def getTaskData(actionData: ActionData, env: String): TaskData = {
     val srcFile = actionData.getSrc
     val src = Source.fromFile(s"repo/src/$srcFile").mkString
@@ -54,14 +58,14 @@ object DataLoader extends Logging {
     TaskData(src, envData, actionData.getGroupId, actionData.getTypeId, exports)
   }
 
-  def getTaskDataBytes(actionData: ActionData, env: String): Array[Byte] = {
-    mapper.writeValueAsBytes(getTaskData(actionData, env))
-  }
-
   def getTaskDataString(actionData: ActionData, env: String): String = {
     mapper.writeValueAsString(getTaskData(actionData, env))
   }
 
+  def getExecutorDataBytes(env: String, clusterConf: ClusterConfig): Array[Byte] = {
+    mapper.writeValueAsBytes(getExecutorData(env, clusterConf))
+  }
+
   def getExecutorData(env: String, clusterConf: ClusterConfig): ExecData = {
 
     // loading the job configuration
@@ -85,22 +89,18 @@ object DataLoader extends Logging {
     ExecData(envData, depsData, pyDepsData, config)
   }
 
-  def getExecutorDataBytes(env: String, clusterConf: ClusterConfig): Array[Byte] = {
-    mapper.writeValueAsBytes(getExecutorData(env, clusterConf))
-  }
-
-  def getExecutorDataString(env: String, clusterConf: ClusterConfig): String = {
-    mapper.writeValueAsString(getExecutorData(env, clusterConf))
-  }
-
   def yamlToMap(file: File): (String, Map[String, Any]) = {
 
     val yaml = new Yaml()
     val conf = yaml.load(new FileInputStream(file)).asInstanceOf[java.util.Map[String, Any]].asScala.toMap
 
-    (file.getName.replace(".yml",""), conf)
+    (file.getName.replace(".yml", ""), conf)
+  }
+
+  def getExecutorDataString(env: String, clusterConf: ClusterConfig): String = {
+    mapper.writeValueAsString(getExecutorData(env, clusterConf))
   }
 
 }
 
-class ConfMap[String,  T <: ConfMap[String, T]] extends mutable.ListMap[String, Either[String, T]]
\ No newline at end of file
+class ConfMap[String, T <: ConfMap[String, T]] extends mutable.ListMap[String, Either[String, T]]
\ No newline at end of file
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala
index cabe0e53..c9df942a 100644
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala
+++ b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala
@@ -24,7 +24,7 @@ object MemoryFormatParser {
     if (lower.contains("mb")) {
       result = lower.replace("mb", "").toInt
     } else if (lower.contains("gb") | lower.contains("g")) {
-      result = lower.replace("g", "").replace("b","").toInt * 1024
+      result = lower.replace("g", "").replace("b", "").toInt * 1024
     } else {
       result = lower.toInt
     }
diff --git a/leader/build.gradle b/leader/build.gradle
index dc244fc1..0138fe63 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -20,7 +20,6 @@ plugins {
     id 'scala'
     id 'org.jetbrains.kotlin.jvm'
     id 'java'
-
 }
 
 sourceCompatibility = 1.8
@@ -60,12 +59,9 @@ dependencies {
     compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
     compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6'
     compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
-    compile group: 'org.reflections', name: 'reflections', version: '0.9.11'
-    compile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.15.3'
     compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
     compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
     compile "org.jetbrains.kotlin:kotlin-reflect"
-    runtime group: 'org.apache.activemq', name: 'activemq-kahadb-store', version: '5.15.3'
 
     testCompile project(':common')
     testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
@@ -92,7 +88,7 @@ sourceSets {
             srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala']
         }
         java {
-            srcDirs = []
+            srcDirs = ['src/main/java']
         }
     }
 }
@@ -120,6 +116,20 @@ task copyToHome() {
 compileKotlin{
     kotlinOptions.jvmTarget = "1.8"
 }
+
 compileTestKotlin {
     kotlinOptions.jvmTarget = "1.8"
 }
+
+compileTestScala {
+    dependsOn compileScala
+}
+
+compileScala {
+    dependsOn compileJava
+    classpath += files(compileJava.destinationDir) + files(compileKotlin.destinationDir)
+}
+
+compileJava {
+    dependsOn compileKotlin
+}
\ No newline at end of file
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
index 5e861881..3b0e8c24 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
+++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
@@ -18,8 +18,8 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.amaterasu.common.configuration.ClusterConfig;
-import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory;
-import org.apache.amaterasu.leader.utilities.ActiveReportListener;
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory;
+import org.apache.amaterasu.leader.common.utilities.ActiveReportListener;
 import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala b/leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala
deleted file mode 100755
index d9be4dd7..00000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.common.actions
-
-import java.util
-import java.util.concurrent.BlockingQueue
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.execution.actions.Action
-import org.apache.curator.framework.CuratorFramework
-import org.apache.zookeeper.CreateMode
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-
-class SequentialAction extends Action {
-
-  var jobId: String = _
-  var jobsQueue: BlockingQueue[ActionData] = _
-  var attempts: Int = 2
-  var attempt: Int = 1
-
-  def execute(): Unit = {
-
-    try {
-
-      announceQueued
-      jobsQueue.add(data)
-
-    }
-    catch {
-
-      //TODO: this will not invoke the error action
-      case e: Exception => handleFailure(e.getMessage)
-
-    }
-
-  }
-
-  override def handleFailure(message: String): String = {
-
-    println(s"Part ${data.getName} of group ${data.getGroupId} and of type ${data.getTypeId} failed on attempt $attempt with message: $message")
-    attempt += 1
-
-    if (attempt <= attempts) {
-      data.getId
-    }
-    else {
-      announceFailure()
-      println(s"===> moving to err action ${data.errorActionId}")
-      data.setStatus ( ActionStatus.failed )
-      data.errorActionId
-    }
-
-  }
-
-}
-
-object SequentialAction {
-
-  def apply(name: String,
-            src: String,
-            groupId: String,
-            typeId: String,
-            exports: Map[String, String],
-            jobId: String,
-            queue: BlockingQueue[ActionData],
-            zkClient: CuratorFramework,
-            attempts: Int): SequentialAction = {
-
-    val action = new SequentialAction()
-
-    action.jobsQueue = queue
-
-    // creating a znode for the action
-    action.client = zkClient
-    action.actionPath = action.client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(s"/$jobId/task-", ActionStatus.pending.toString.getBytes())
-    action.actionId = action.actionPath.substring(action.actionPath.indexOf("task-") + 5)
-
-    action.attempts = attempts
-    action.jobId = jobId
-    val javaExports = exports.asJava
-    action.data = new ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, javaExports, new util.ArrayList[String]())
-    action.jobsQueue = queue
-    action.client = zkClient
-
-    action
-  }
-
-}
-
-object ErrorAction {
-
-  def apply(name: String,
-            src: String,
-            parent: String,
-            groupId: String,
-            typeId: String,
-            jobId: String,
-            queue: BlockingQueue[ActionData],
-            zkClient: CuratorFramework): SequentialAction = {
-
-    val action = new SequentialAction()
-
-    action.jobsQueue = queue
-
-    // creating a znode for the action
-    action.client = zkClient
-    action.actionPath = action.client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId/task-$parent-error", ActionStatus.pending.toString.getBytes())
-    action.actionId = action.actionPath.substring(action.actionPath.indexOf('-') + 1).replace("/", "-")
-
-    action.jobId = jobId
-    action.data = new ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, new util.HashMap[String, String](), new util.ArrayList[String]())
-    action.jobsQueue = queue
-    action.client = zkClient
-
-    action
-
-  }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
deleted file mode 100755
index e08489cd..00000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.dsl
-
-import java.util.concurrent.BlockingQueue
-
-import com.fasterxml.jackson.databind.node.ArrayNode
-import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.execution.JobManager
-import org.apache.amaterasu.leader.common.actions.{ErrorAction, SequentialAction}
-import org.apache.amaterasu.leader.common.execution.actions.Action
-import org.apache.curator.framework.CuratorFramework
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.io.Source
-
-/**
-  * The JobParser class is in charge of parsing the maki.yaml file which
-  * describes the workflow of an amaterasu job
-  */
-object JobParser {
-
-  def loadMakiFile(): String = {
-
-    Source.fromFile("repo/maki.yml").mkString
-
-  }
-
-  /**
-    * Parses the maki.yml string and creates a job manager
-    *
-    * @param jobId
-    * @param maki a string containing the YAML definition of the job
-    * @param actionsQueue
-    * @param client
-    * @return
-    */
-  def parse(jobId: String,
-            maki: String,
-            actionsQueue: BlockingQueue[ActionData],
-            client: CuratorFramework,
-            attempts: Int): JobManager = {
-
-    val mapper = new ObjectMapper(new YAMLFactory())
-
-    val job = mapper.readTree(maki)
-
-    // loading the job details
-    val manager = JobManager(jobId, job.path("job-name").asText, actionsQueue, client)
-
-    // iterating the flow list and constructing the job's flow
-    val actions = job.path("flow").asInstanceOf[ArrayNode].asScala.toSeq
-
-    parseActions(actions, manager, actionsQueue, attempts, null)
-
-    manager
-  }
-
-  /**
-    * parseActions is a recursive function, for building the workflow of
-    * the job
-    * God, I miss Clojure
-    *
-    * @param actions  a seq containing the definitions of all the actions
-    * @param manager  the job manager for the job
-    * @param actionsQueue
-    * @param previous the previous action, this is used in order to add the current action
-    *                 to the nextActionIds
-    */
-  def parseActions(actions: Seq[JsonNode],
-                   manager: JobManager,
-                   actionsQueue: BlockingQueue[ActionData],
-                   attempts: Int,
-                   previous: Action): Unit = {
-
-    if (actions.isEmpty)
-      return
-
-    val actionData = actions.head
-
-    val action = parseSequentialAction(
-      actionData,
-      manager.jobId,
-      actionsQueue,
-      manager.client,
-      attempts
-    )
-
-    //updating the list of frameworks setup
-    manager.frameworks.getOrElseUpdate(action.data.getGroupId,
-                                       new mutable.HashSet[String]())
-                                       .add(action.data.getTypeId)
-
-
-    if (manager.head == null) {
-      manager.head = action
-    }
-
-    if (previous != null) {
-      previous.data.getNextActionIds.add(action.actionId)
-    }
-    manager.registerAction(action)
-
-    val errorNode = actionData.path("error")
-
-    if (!errorNode.isMissingNode) {
-
-      val errorAction = parseErrorAction(
-        errorNode,
-        manager.jobId,
-        action.data.getId,
-        actionsQueue,
-        manager.client
-      )
-
-      action.data.errorActionId = errorAction.data.getId
-      manager.registerAction(errorAction)
-
-      //updating the list of frameworks setup
-      manager.frameworks.getOrElseUpdate(errorAction.data.getGroupId,
-        new mutable.HashSet[String]())
-        .add(errorAction.data.getTypeId)
-    }
-
-    parseActions(actions.tail, manager, actionsQueue, attempts, action)
-
-  }
-
-  def parseSequentialAction(action: JsonNode,
-                            jobId: String,
-                            actionsQueue: BlockingQueue[ActionData],
-                            client: CuratorFramework,
-                            attempts: Int): SequentialAction = {
-
-    SequentialAction(action.path("name").asText,
-      action.path("file").asText,
-      action.path("runner").path("group").asText,
-      action.path("runner").path("type").asText,
-      action.path("exports").fields().asScala.toSeq.map(e => (e.getKey, e.getValue.asText())).toMap,
-      jobId,
-      actionsQueue,
-      client,
-      attempts)
-  }
-
-  def parseErrorAction(action: JsonNode,
-                       jobId: String,
-                       parent: String,
-                       actionsQueue: BlockingQueue[ActionData],
-                       client: CuratorFramework): SequentialAction = {
-
-    ErrorAction(
-      action.path("name").asText,
-      action.path("file").asText,
-      parent,
-      action.path("group").asText,
-      action.path("type").asText,
-      jobId,
-      actionsQueue,
-      client
-    )
-
-  }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala
index 234070d8..49df0099 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala
@@ -21,8 +21,8 @@ import java.util.concurrent.BlockingQueue
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.common.dsl.GitUtil
-import org.apache.amaterasu.leader.dsl.JobParser
+import org.apache.amaterasu.leader.common.dsl.{GitUtil, JobParser}
+import org.apache.amaterasu.leader.common.execution.JobManager
 import org.apache.curator.framework.CuratorFramework
 import org.apache.zookeeper.CreateMode
 
@@ -102,4 +102,4 @@ object JobLoader extends Logging {
 
   }
 
-}
\ No newline at end of file
+}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
deleted file mode 100755
index 70642dbf..00000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.execution
-
-import java.util.concurrent.BlockingQueue
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.common.execution.actions.Action
-import org.apache.curator.framework.CuratorFramework
-
-import scala.collection.concurrent.TrieMap
-import scala.collection.mutable
-
-/**
-  * The JobManager manages the lifecycle of a job. It queues new actions for execution,
-  * tracks the state of actions and is in charge of communication with the underlying
-  * cluster management framework (mesos)
-  */
-class JobManager extends Logging {
-
-  var name: String = _
-  var jobId: String = _
-  var client: CuratorFramework = _
-  var head: Action = _
-
-  // TODO: this is not private due to tests, fix this!!!
-  val registeredActions = new TrieMap[String, Action]
-  val frameworks = new TrieMap[String, mutable.HashSet[String]]
-  private var executionQueue: BlockingQueue[ActionData] = _
-
-  /**
-    * The start method initiates the job execution by executing the first action.
-    * start mast be called once and by the JobManager only
-    */
-  def start(): Unit = {
-
-    head.execute()
-
-  }
-
-  def outOfActions: Boolean = !registeredActions.values.exists(a => a.data.getStatus == ActionStatus.pending ||
-    a.data.getStatus == ActionStatus.queued ||
-    a.data.getStatus == ActionStatus.started)
-  /**
-    * getNextActionData returns the data of the next action to be executed if such action
-    * exists
-    *
-    * @return the ActionData of the next action, returns null if no such action exists
-    */
-  def getNextActionData: ActionData = {
-
-    val nextAction: ActionData = executionQueue.poll()
-
-    if (nextAction != null) {
-      registeredActions(nextAction.getId).announceStart
-    }
-
-    nextAction
-  }
-
-  def reQueueAction(actionId: String): Unit = {
-
-    val action = registeredActions(actionId)
-    executionQueue.put(action.data)
-    registeredActions(actionId).announceQueued
-
-  }
-
-  /**
-    * Registers an action with the job
-    *
-    * @param action
-    */
-  def registerAction(action: Action): Unit = {
-
-    registeredActions.put(action.actionId, action)
-
-  }
-
-  /**
-    * announce the completion of an action and executes the next actions
-    *
-    * @param actionId
-    */
-  def actionComplete(actionId: String): Unit = {
-
-    val action = registeredActions.get(actionId).get
-    action.announceComplete
-    action.data.getNextActionIds.toArray.foreach(id => registeredActions.get(id.toString).get.execute())
-
-    // we don't need the error action anymore
-    if (action.data.errorActionId != null)
-      registeredActions.get(action.data.errorActionId).get.announceCanceled
-  }
-
-  /**
-    * gets the next action id which can be either the same action or an error action
-    * and if it exist (we have an error action or a retry)
-    *
-    * @param actionId
-    */
-  def actionFailed(actionId: String, message: String): Unit = {
-
-    log.warn(message)
-
-    val action = registeredActions.get(actionId).get
-    val id = action.handleFailure(message)
-    if (id != null)
-      registeredActions.get(id).get.execute()
-
-    //delete all future actions
-    cancelFutureActions(action)
-  }
-
-  def cancelFutureActions(action: Action): Unit = {
-
-    if (action.data.getStatus != ActionStatus.failed)
-      action.announceCanceled
-
-    action.data.getNextActionIds.toArray.foreach(id =>
-      cancelFutureActions(registeredActions.get(id.toString).get))
-  }
-
-  /**
-    * announce the start of execution of the action
-    */
-  def actionStarted(actionId: String): Unit = {
-
-    val action = registeredActions.get(actionId).get
-    action.announceStart
-
-  }
-
-  def actionsCount(): Int = {
-    executionQueue.size()
-  }
-}
-
-object JobManager {
-
-  /**
-    * The apply method starts the job execution once the job is created from the maki.yaml file
-    * it is in charge of creating the internal flow map, setting up ZooKeeper and executing
-    * the first action
-    * If the job execution is resumed (a job that was stooped) the init method will restore the
-    * state of the job from ZooKepper
-    *
-    * @param jobId
-    * @param name
-    * @param jobsQueue
-    * @param client
-    * @return
-    */
-  def apply(
-    jobId: String,
-    name: String,
-    jobsQueue: BlockingQueue[ActionData],
-    client: CuratorFramework
-  ): JobManager = {
-
-    val manager = new JobManager()
-    manager.name = name
-    manager.executionQueue = jobsQueue
-    manager.jobId = jobId
-    manager.client = client
-
-    manager
-
-  }
-
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index bcd7923d..751a3935 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -34,9 +34,10 @@ import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel
 import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType}
 import org.apache.amaterasu.leader.common.configuration.ConfigManager
+import org.apache.amaterasu.leader.common.execution.JobManager
 import org.apache.amaterasu.leader.common.utilities.DataLoader
-import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory
-import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+import org.apache.amaterasu.leader.execution.JobLoader
 import org.apache.amaterasu.leader.utilities.HttpServer
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
@@ -158,16 +159,16 @@ class JobScheduler extends AmaterasuScheduler {
             val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
 
             // setting up the configuration files for the container
-            val envYaml = configManager.getActionConfigContent(actionData.getName, "") //TODO: replace with the value in actionData.config
-            writeConfigFile(envYaml, jobManager.jobId, actionData.getName, "env.yaml")
+            val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
+            writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
 
             val dataStores = DataLoader.getTaskData(actionData, env).exports
             val writer = new StringWriter()
             yamlMapper.writeValue(writer, dataStores)
             val dataStoresYaml = writer.toString
-            writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.getName, "datastores.yaml")
+            writeConfigFile(dataStoresYaml, jobManager.getJobId, actionData.getName, "datastores.yaml")
 
-            writeConfigFile(s"jobId: ${jobManager.jobId}\nactionName: ${actionData.getName}", jobManager.jobId, actionData.getName, "runtime.yaml")
+            writeConfigFile(s"jobId: ${jobManager.getJobId}\nactionName: ${actionData.getName}", jobManager.getJobId, actionData.getName, "runtime.yaml")
 
             offersToTaskIds.put(offer.getId.getValue, taskId.getValue)
 
@@ -197,12 +198,12 @@ class JobScheduler extends AmaterasuScheduler {
               //creating the command
 
               // TODO: move this into the runner provider somehow
-              copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.jobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING)
+              copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING)
 
-              println(s"===> ${runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")}")
+              println(s"===> ${runnerProvider.getCommand(jobManager.getJobId, actionData, env, executorId, "")}")
               val command = CommandInfo
                 .newBuilder
-                .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, ""))
+                .setValue(runnerProvider.getCommand(jobManager.getJobId, actionData, env, executorId, ""))
                 .addUris(URI.newBuilder
                   .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
                   .setExecutable(false)
@@ -211,21 +212,21 @@ class JobScheduler extends AmaterasuScheduler {
 
                 // Getting env.yaml
                 command.addUris(URI.newBuilder
-                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/env.yaml")
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.getJobId}/${actionData.getName}/env.yaml")
                   .setExecutable(false)
                   .setExtract(true)
                   .build())
 
                 // Getting datastores.yaml
                 command.addUris(URI.newBuilder
-                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/datastores.yaml")
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.getJobId}/${actionData.getName}/datastores.yaml")
                   .setExecutable(false)
                   .setExtract(true)
                   .build())
 
                 // Getting runtime.yaml
                 command.addUris(URI.newBuilder
-                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/runtime.yaml")
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.getJobId}/${actionData.getName}/runtime.yaml")
                   .setExecutable(false)
                   .setExtract(true)
                   .build())
@@ -245,7 +246,7 @@ class JobScheduler extends AmaterasuScheduler {
                 .build()))
 
               // Getting action specific resources
-              runnerProvider.getActionResources(jobManager.jobId, actionData).foreach(r => command.addUris(URI.newBuilder
+              runnerProvider.getActionResources(jobManager.getJobId, actionData).foreach(r => command.addUris(URI.newBuilder
                 .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r")
                 .setExecutable(false)
                 .setExtract(false)
@@ -297,8 +298,8 @@ class JobScheduler extends AmaterasuScheduler {
 
             driver.launchTasks(Collections.singleton(offer.getId), Collections.singleton(actionTask))
           }
-          else if (jobManager.outOfActions) {
-            log.info(s"framework ${jobManager.jobId} execution finished")
+          else if (jobManager.getOutOfActions) {
+            log.info(s"framework ${jobManager.getJobId} execution finished")
 
             val repo = new File("repo/")
             repo.delete()
@@ -355,7 +356,7 @@ class JobScheduler extends AmaterasuScheduler {
 
     jobManager.start()
 
-    createJobDir(jobManager.jobId)
+    createJobDir(jobManager.getJobId)
 
   }
 
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index 23700f80..099ea482 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -21,16 +21,18 @@ import java.net.{InetAddress, ServerSocket}
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-import javax.jms.Session
 
+import javax.jms.Session
 import org.apache.activemq.ActiveMQConnectionFactory
 import org.apache.activemq.broker.BrokerService
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory
-import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
-import org.apache.amaterasu.leader.utilities.{ActiveReportListener, Args}
+import org.apache.amaterasu.leader.common.execution.JobManager
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+import org.apache.amaterasu.leader.common.utilities.ActiveReportListener
+import org.apache.amaterasu.leader.execution.JobLoader
+import org.apache.amaterasu.leader.utilities.Args
 import org.apache.curator.framework.recipes.barriers.DistributedBarrier
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
@@ -122,17 +124,17 @@ class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
 
     // now that the job was initiated, the curator client is started and we can
     // register the broker's address
-    client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.jobId}/broker")
-    client.setData().forPath(s"/${jobManager.jobId}/broker", address.getBytes)
+    client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.getJobId}/broker")
+    client.setData().forPath(s"/${jobManager.getJobId}/broker", address.getBytes)
 
     // once the broker is registered, we can remove the barrier so clients can connect
-    log.info(s"/${jobManager.jobId}-report-barrier")
-    val barrier = new DistributedBarrier(client, s"/${jobManager.jobId}-report-barrier")
+    log.info(s"/${jobManager.getJobId}-report-barrier")
+    val barrier = new DistributedBarrier(client, s"/${jobManager.getJobId}-report-barrier")
     barrier.removeBarrier()
 
-    setupMessaging(jobManager.jobId)
+    setupMessaging(jobManager.getJobId)
 
-    log.info(s"Job ${jobManager.jobId} initiated with ${jobManager.registeredActions.size} actions")
+    log.info(s"Job ${jobManager.getJobId} initiated with ${jobManager.getRegisteredActions.size} actions")
 
     jarPath = new Path(config.YARN.hdfsJarsPath)
 
@@ -160,13 +162,13 @@ class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
     log.info("Max mem capability of resources in this cluster " + maxMem)
     val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores
     log.info("Max vcores capability of resources in this cluster " + maxVCores)
-    log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.registeredActions.size}")
+    log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}")
 
     // Resource requirements for worker containers
     this.capability = Records.newRecord(classOf[Resource])
     val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
 
-    while (!jobManager.outOfActions) {
+    while (!jobManager.getOutOfActions) {
       val actionData = jobManager.getNextActionData
       if (actionData != null) {
 
@@ -248,7 +250,7 @@ class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
         val framework = frameworkFactory.getFramework(actionData.getGroupId)
         val runnerProvider = framework.getRunnerProvider(actionData.getTypeId)
         val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-        val commands: List[String] = List(runnerProvider.getCommand(jobManager.jobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address))
+        val commands: List[String] = List(runnerProvider.getCommand(jobManager.getJobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address))
 
         log.info("Running container id {}.", container.getId.getContainerId)
         log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
@@ -385,17 +387,17 @@ class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
       }
     }
 
-    if (jobManager.outOfActions) {
+    if (jobManager.getOutOfActions) {
       log.info("Finished all tasks successfully! Wow!")
       jobManager.actionsCount()
       stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS")
     } else {
-      log.info(s"jobManager.registeredActions.size: ${jobManager.registeredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}")
+      log.info(s"jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}")
     }
   }
 
   override def getProgress: Float = {
-    jobManager.registeredActions.size.toFloat / completedContainersAndTaskIds.size
+    jobManager.getRegisteredActions.size.toFloat / completedContainersAndTaskIds.size
   }
 
   override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = {
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
index 379dd1b7..bade9483 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
@@ -23,8 +23,8 @@ import java.util.concurrent.ConcurrentHashMap
 import com.google.gson.Gson
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.leader.common.execution.JobManager
 import org.apache.amaterasu.leader.common.utilities.DataLoader
-import org.apache.amaterasu.leader.execution.JobManager
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
 import org.apache.hadoop.yarn.util.Records
@@ -87,7 +87,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync,
   }
 
   override def getProgress: Float = {
-    jobManager.registeredActions.size.toFloat / completedContainersAndTaskIds.size
+    jobManager.getRegisteredActions.size.toFloat / completedContainersAndTaskIds.size
   }
 
   override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = {
@@ -108,7 +108,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync,
                          | java -cp executor.jar:spark-${config.Webserver.sparkVersion}/lib/*
                          | -Dscala.usejavacp=true
                          | -Djava.library.path=/usr/lib org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher
-                         | ${jobManager.jobId} ${config.master} ${actionData.getName} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin
+                         | ${jobManager.getJobId} ${config.master} ${actionData.getName} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin
         ctx.setCommands(Collections.singletonList(command))
 
         ctx.setLocalResources(Map[String, LocalResource] (
diff --git a/leader/src/test/resources/simple-maki.yml b/leader/src/test/resources/simple-maki.yml
index 151c89bb..f640e112 100755
--- a/leader/src/test/resources/simple-maki.yml
+++ b/leader/src/test/resources/simple-maki.yml
@@ -22,13 +22,16 @@ flow:
       group: spark
       type: scala
       src: simple-spark.scala
+      config: start-cfg.yaml
     - name: step2
       group: spark
       type: scala
       src: file2.scala
+      config: step2-cfg.yaml
       error:
         name: error-action
         group: spark
         type: scala
         src: error.scala
-...
\ No newline at end of file
+        config: error-cfg.yaml
+...
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
index 197c703d..806bae9a 100755
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
+++ b/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
@@ -21,12 +21,12 @@ import java.util.concurrent.LinkedBlockingQueue
 
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.actions.SequentialAction
+import org.apache.amaterasu.leader.common.execution.actions.SequentialAction
 import org.apache.curator.framework.CuratorFrameworkFactory
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer
 import org.apache.zookeeper.CreateMode
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.{DoNotDiscover, FlatSpec, Matchers}
 
 import scala.collection.JavaConverters._
 
@@ -36,7 +36,7 @@ class ActionStatusTests extends FlatSpec with Matchers {
   val retryPolicy = new ExponentialBackoffRetry(1000, 3)
   val server = new TestingServer(2181, true)
   val jobId = s"job_${System.currentTimeMillis}"
-  val data = new ActionData(ActionStatus.pending, "test_action", "start.scala", "spark","scala", "0000001", new util.HashMap() , List[String]().asJava)
+  val data = new ActionData(ActionStatus.pending, "test_action", "start.scala", "", "spark","scala", "0000001", new util.HashMap() , List[String]().asJava)
 
   "an Action" should "queue it's ActionData int the job queue when executed" in {
 
@@ -47,7 +47,7 @@ class ActionStatusTests extends FlatSpec with Matchers {
     client.start()
 
     client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
-    val action = SequentialAction(data.getName, data.getSrc, data.getGroupId, data.getTypeId, Map.empty, jobId, queue, client, 1)
+    val action = new SequentialAction(data.getName, data.getSrc, "", data.getGroupId, data.getTypeId, Map.empty[String, String].asJava, jobId, queue, client, 1)
 
     action.execute()
     queue.peek().getName should be(data.getName)
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
index ef47cc17..469335cc 100755
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
+++ b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
@@ -19,7 +19,7 @@ package org.apache.amaterasu.common.execution
 import java.util.concurrent.LinkedBlockingQueue
 
 import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.dsl.JobParser
+import org.apache.amaterasu.leader.common.dsl.JobParser
 import org.apache.curator.framework.CuratorFrameworkFactory
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer
@@ -40,16 +40,25 @@ class JobExecutionTests extends FlatSpec with Matchers {
   val queue = new LinkedBlockingQueue[ActionData]()
 
   // this will be performed by the job bootstraper
+
   client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
   //  client.setData().forPath(s"/$jobId/src",src.getBytes)
   //  client.setData().forPath(s"/$jobId/branch", branch.getBytes)
 
+
   val job = JobParser.parse(jobId, yaml, queue, client, 1)
 
+  queue.toArray.foreach(it => {
+    val d = it.asInstanceOf[ActionData]
+    println(s"+++++++> ${d.getName}")
+    println(s"  +++++++> ${d.getErrorActionId}")
+  })
+
   "a job" should "queue the first action when the JobManager.start method is called " in {
 
     job.start
-    queue.peek.getName should be ("start")
+
+    queue.peek.getName should be("start")
 
     // making sure that the status is reflected in zk
     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
@@ -59,8 +68,8 @@ class JobExecutionTests extends FlatSpec with Matchers {
 
   it should "return the start action when calling getNextAction and dequeue it" in {
 
-    job.getNextActionData.getName should be ("start")
-    queue.size should be (0)
+    job.getNextActionData.getName should be("start")
+    queue.size should be(0)
 
     // making sure that the status is reflected in zk
     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
@@ -74,13 +83,14 @@ class JobExecutionTests extends FlatSpec with Matchers {
 
     // making sure that the status is reflected in zk
     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-    new String(actionStatus) should be("complete")
+
+    new String(new String(actionStatus)) should be("complete")
 
   }
 
   "the next step2 job" should "be queued as a result of the completion" in {
 
-    queue.peek.getName should be ("step2")
+    queue.peek.getName should be("step2")
 
     // making sure that the status is reflected in zk
     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
@@ -92,7 +102,7 @@ class JobExecutionTests extends FlatSpec with Matchers {
 
     val data = job.getNextActionData
 
-    data.getName should be ("step2")
+    data.getName should be("step2")
 
     // making sure that the status is reflected in zk
     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
@@ -102,7 +112,7 @@ class JobExecutionTests extends FlatSpec with Matchers {
   it should "be marked as failed when JobManager. is called" in {
 
     job.actionFailed("0000000001", "test failure")
-    queue.peek.getName should be ("error-action")
+    queue.peek.getName should be("error-action")
 
     // making sure that the status is reflected in zk
     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
index 13685f91..1b8581f9 100755
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
+++ b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
@@ -19,7 +19,7 @@ package org.apache.amaterasu.common.execution
 import java.util.concurrent.LinkedBlockingQueue
 
 import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.dsl.JobParser
+import org.apache.amaterasu.leader.common.dsl.JobParser
 import org.apache.curator.framework.CuratorFrameworkFactory
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer
@@ -46,19 +46,29 @@ class JobParserTests extends FlatSpec with Matchers {
 
   "JobParser" should "parse the simple-maki.yml" in {
 
-    job.name should be("amaterasu-test")
+    job.getName should be("amaterasu-test")
 
   }
 
   //TODO: I suspect this test is not indicative, and that order is assured need to verify this
   it should "also have two actions in the right order" in {
 
-    job.registeredActions.size should be(3)
+    job.getRegisteredActions.size should be(3)
 
-    job.registeredActions.get("0000000000").get.data.getName should be("start")
-    job.registeredActions.get("0000000001").get.data.getName should be("step2")
-    job.registeredActions.get("0000000001-error").get.data.getName should be("error-action")
+    job.getRegisteredActions.get("0000000000").data.getName should be("start")
+    job.getRegisteredActions.get("0000000001").data.getName should be("step2")
+    job.getRegisteredActions.get("0000000001-error").data.getName should be("error-action")
 
   }
 
-}
\ No newline at end of file
+  it should "Action 'config' is parsed successfully" in {
+
+    job.getRegisteredActions.size should be(3)
+
+    job.getRegisteredActions.get("0000000000").data.getConfig should be("start-cfg.yaml")
+    job.getRegisteredActions.get("0000000001").data.getConfig should be("step2-cfg.yaml")
+    job.getRegisteredActions.get("0000000001-error").data.getConfig should be("error-cfg.yaml")
+
+  }
+
+}
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
index 64887ab6..b343a6b4 100755
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
+++ b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
@@ -20,12 +20,13 @@ import java.util.concurrent.LinkedBlockingQueue
 
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
+import org.apache.amaterasu.leader.common.execution.JobManager
+import org.apache.amaterasu.leader.execution.JobLoader
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer
 import org.apache.zookeeper.CreateMode
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import org.scalatest.{BeforeAndAfterEach, DoNotDiscover, FlatSpec, Matchers}
 
 import scala.io.Source
 
@@ -80,4 +81,4 @@ class JobRestoreTests extends FlatSpec with Matchers with BeforeAndAfterEach {
 
     queue.peek.getName should be("start")
   }
-}
\ No newline at end of file
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services