You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@amaterasu.apache.org by GitBox <gi...@apache.org> on 2018/10/13 06:00:18 UTC

[GitHub] nadav-har-tzvi closed pull request #34: Amaterasu 49 - mesos implementation

nadav-har-tzvi closed pull request #34: Amaterasu 49 - mesos implementation
URL: https://github.com/apache/incubator-amaterasu/pull/34
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 94b5dad..06a07e8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -19,20 +19,25 @@ plugins {
 }
 
 apply plugin: 'distribution'
+apply plugin: 'project-report'
+
+htmlDependencyReport {
+    projects = project.allprojects
+}
 
 allprojects {
     group 'org.apache.amaterasu'
     version '0.2.0-incubating-rc4'
 }
 
-task copyLeagalFiles(type: Copy) {
+task copyLegalFiles(type: Copy) {
     from "./DISCLAIMER", "./LICENSE", "./NOTICE"
     into "${buildDir}/amaterasu"
 }
 
 task buildHomeDir() {
     dependsOn subprojects.collect { getTasksByName('copyToHome', true) }
-    dependsOn copyLeagalFiles
+    dependsOn copyLegalFiles
 }
 
 distributions {
diff --git a/frameworks/spark/dispatcher/build.gradle b/frameworks/spark/dispatcher/build.gradle
index a95d958..a2bf9fe 100644
--- a/frameworks/spark/dispatcher/build.gradle
+++ b/frameworks/spark/dispatcher/build.gradle
@@ -39,6 +39,7 @@ dependencies {
     compile project(':common')
     compile project(':leader-common')
     compile project(':amaterasu-sdk')
+    compile 'com.uchuhimo:konf:0.11'
 }
 
 task copyToHomeBin(type: Copy) {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
index ac442d5..79bd080 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
@@ -103,4 +103,6 @@ class SparkSetupProvider extends FrameworkSetupProvider {
   override def getRunnerProvider(runnerId: String): RunnerSetupProvider = {
     runnerProviders(runnerId)
   }
+
+  override def getConfigurationItems = Array("sparkConfiguration", "sparkExecutor")
 }
\ No newline at end of file
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 6ffa237..1948b90 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 38185b0..a95009c 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,5 @@
-#Sun Jun 10 14:41:17 AEST 2018
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-bin.zip
diff --git a/gradlew b/gradlew
index 3efb0e9..cccdd3d 100755
--- a/gradlew
+++ b/gradlew
@@ -1,20 +1,4 @@
-#!/usr/bin/env bash
-#
-#    Licensed to the Apache Software Foundation (ASF) under one or more
-#    contributor license agreements.  See the NOTICE file distributed with
-#    this work for additional information regarding copyright ownership.
-#    The ASF licenses this file to You under the Apache License, Version 2.0
-#    (the "License"); you may not use this file except in compliance with
-#    the License.  You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS,
-#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#    See the License for the specific language governing permissions and
-#    limitations under the License.
-#
+#!/usr/bin/env sh
 
 ##############################################################################
 ##
@@ -49,11 +33,11 @@ DEFAULT_JVM_OPTS=""
 # Use the maximum available, or set MAX_FD != -1 to use that value.
 MAX_FD="maximum"
 
-warn ( ) {
+warn () {
     echo "$*"
 }
 
-die ( ) {
+die () {
     echo
     echo "$*"
     echo
@@ -170,16 +154,19 @@ if $cygwin ; then
     esac
 fi
 
-# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
-function splitJvmOpts() {
-    JVM_OPTS=("$@")
+# Escape application args
+save () {
+    for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
+    echo " "
 }
-eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
-JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
+APP_ARGS=$(save "$@")
+
+# Collect all arguments for the java command, following the shell quoting and substitution rules
+eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
 
 # by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
-if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then
+if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
   cd "$(dirname "$0")"
 fi
 
-exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
+exec "$JAVACMD" "$@"
diff --git a/gradlew.bat b/gradlew.bat
index f664a3f..e95643d 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -1,21 +1,3 @@
-rem
-rem
-rem    Licensed to the Apache Software Foundation (ASF) under one or more
-rem    contributor license agreements.  See the NOTICE file distributed with
-rem    this work for additional information regarding copyright ownership.
-rem    The ASF licenses this file to You under the Apache License, Version 2.0
-rem    (the "License"); you may not use this file except in compliance with
-rem    the License.  You may obtain a copy of the License at
-rem
-rem       http://www.apache.org/licenses/LICENSE-2.0
-rem
-rem    Unless required by applicable law or agreed to in writing, software
-rem    distributed under the License is distributed on an "AS IS" BASIS,
-rem    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-rem    See the License for the specific language governing permissions and
-rem    limitations under the License.
-rem
-
 @if "%DEBUG%" == "" @echo off
 @rem ##########################################################################
 @rem
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 3f3ac98..61b9309 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -14,21 +14,53 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+buildscript {
+    ext.kotlin_version = '1.2.60'
+
+    repositories {
+        mavenCentral()
+        maven {
+            url 'http://repository.jetbrains.com/all'
+        }
+        maven {
+            url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots"
+        }
+    }
+
+    dependencies {
+        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+        classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0'
+    }
+}
+
 plugins {
-    id "com.github.johnrengelman.shadow" version "1.2.4"
-    id 'com.github.maiflai.scalatest' version '0.22'
+    id "com.github.johnrengelman.shadow" version "2.0.4"
     id 'scala'
-    id 'java'
+}
+
+apply plugin: 'kotlin'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+junitPlatform {
+    filters {
+        engines {
+            include 'spek'
+        }
+    }
 }
 
 sourceCompatibility = 1.8
 targetCompatibility = 1.8
 
 repositories {
-    maven {
-        url "https://plugins.gradle.org/m2/"
-    }
+    maven { url "https://plugins.gradle.org/m2/" }
+    maven { url 'http://repository.jetbrains.com/all' }
+    maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" }
+    maven { url "http://dl.bintray.com/jetbrains/spek" }
+    maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" }
+
     mavenCentral()
+    jcenter()
 }
 
 dependencies {
@@ -41,4 +73,43 @@ dependencies {
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
     compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
-}
\ No newline at end of file
+
+    compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r'
+
+    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+    compile "org.jetbrains.kotlin:kotlin-reflect"
+    compile('com.uchuhimo:konf:0.11') {
+        exclude group: 'org.eclipse.jgit'
+    }
+
+    testCompile 'org.jetbrains.spek:spek-api:1.1.5'
+    testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
+    testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
+
+    // spek requires kotlin-reflect, can be omitted if already in the classpath
+    testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
+
+}
+
+sourceSets {
+    test {
+        resources.srcDirs += [file('src/test/resources')]
+    }
+
+}
+
+compileKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
+compileTestKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
+//
+//kotlin {
+//    experimental {
+//        coroutines 'enable'
+//    }
+//}
+
+//task copyToHome() {
+//}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
new file mode 100644
index 0000000..2f98fa6
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.configuration
+
+import com.uchuhimo.konf.Config
+import com.uchuhimo.konf.source.yaml.toYaml
+import java.io.File
+
+class ConfigManager(private val env: String, private val repoPath: String, private val frameworkItems: List<String> = emptyList()) {
+
+    private val envFolder = "$repoPath/env/$env"
+
+    // this is currently public for testing reasons, need to reconsider
+    var config: Config = Config {
+        addSpec(Job)
+        for (item in frameworkItems) {
+            val frameworkSpec = GenericSpec(item)
+            addSpec(frameworkSpec.spec)
+        }
+    }
+
+    init {
+        for (file in File(envFolder).listFiles()) {
+            config = config.from.yaml.file(file)
+            println(config.toYaml.toText())
+        }
+    }
+
+    fun getActionConfiguration(action: String, path: String = ""): Config {
+
+        val actionPath = if (path.isEmpty()) {
+            "$repoPath/src/$action/env/$env"
+        } else {
+            "$repoPath/$path"
+                    .replace("{env}", env)
+                    .replace("{action_name}", action)
+        }
+
+        var result = config
+
+        val configLocation = File(actionPath)
+        if (configLocation.exists()) {
+            if (configLocation.isDirectory) {
+                for (file in File(actionPath).listFiles()) {
+                    result = config.from.yaml.file(file)
+                }
+            } else {
+                result = config.from.yaml.file(configLocation)
+            }
+        }
+        return result
+    }
+
+    fun getActionConfigContent(action: String, path: String = ""): String {
+        return getActionConfiguration(action, path).toYaml.toText()
+    }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
new file mode 100644
index 0000000..ed6fa9e
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.configuration
+
+import com.uchuhimo.konf.ConfigSpec
+import com.uchuhimo.konf.OptionalItem
+
+class GenericSpec(configurationItem: String) {
+    val spec = ConfigSpec()
+    val items = OptionalItem(spec, configurationItem, emptyMap<String, String>())
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt
new file mode 100644
index 0000000..076b0af
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.configuration
+
+import com.uchuhimo.konf.ConfigSpec
+
+object Job : ConfigSpec("") {
+    val name by required<String>()
+    val master by required<String>()
+    val inputRootPath by required<String>()
+    val outputRootPath by required<String>()
+    val workingDir by required<String>()
+    val configuration by optional(emptyMap<String, String>())
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtil.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtil.kt
new file mode 100644
index 0000000..d479e5f
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtil.kt
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.eclipse.jgit.api.Git
+import java.io.File
+
+object GitUtil {
+    @JvmStatic
+    fun cloneRepo(repoAddress: String, branch: String) {
+        Git.cloneRepository().apply {
+            setURI(repoAddress)
+            setDirectory(File("repo"))
+            setBranch(branch)
+        }.call().close()
+    }
+}
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
new file mode 100644
index 0000000..1aa729b
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.configuration
+
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.*
+import kotlin.test.assertEquals
+import java.io.File
+
+object ConfigManagerTests : Spek({
+
+    val marker = this.javaClass.getResource("/maki.yml").path
+
+    given("a ConfigManager for a job ") {
+
+        val repoPath = "${File(marker).parent}/test_repo"
+        val cfg = ConfigManager("test", repoPath)
+
+        it("loads the job level environment"){
+            assertEquals(cfg.config[Job.master] , "yarn")
+        }
+
+        on("getting an env for an action with default path") {
+            val startConf = cfg.getActionConfiguration("start")
+            it("loads the specific configuration defined in the actions folder"){
+                assertEquals(startConf[Job.master] , "mesos")
+            }
+        }
+
+        on("getting an env for an action with a conf: property in the maki.yml"){
+            val step2conf = cfg.getActionConfiguration("step2", "src/{action_name}/{env}/")
+
+            it("loads the specific configuration defined in the actions folder"){
+                assertEquals(step2conf[Job.name] , "test2")
+            }
+        }
+
+        on("getting an env for an action with no action level config"){
+            val step3conf = cfg.getActionConfiguration("step3")
+
+            it("loads only the job level conf"){
+                assertEquals(step3conf[Job.name] , "test")
+            }
+        }
+
+        on("receiving a path to a specific file" ){
+            val step4conf = cfg.getActionConfiguration("step4", "src/start/env/{env}/job.yml")
+
+            it("loads the specific configuration from the file"){
+                assertEquals(step4conf[Job.master] , "mesos")
+            }
+        }
+
+
+    }
+
+    given("a ConfigManager for a job with spark framework") {
+
+        val repoPath = "${File(marker).parent}/spark_repo"
+        val cfg = ConfigManager("test", repoPath, listOf("sparkConfiguration"))
+
+        it("load the framework configuration for spark"){
+            val spark: Map<String, String> = cfg.config["sparkConfiguration"]
+            assertEquals(spark["spark.executor.memory"], "1g")
+        }
+    }
+})
\ No newline at end of file
diff --git a/leader-common/src/test/resources/maki.yml b/leader-common/src/test/resources/maki.yml
new file mode 100644
index 0000000..576b7dd
--- /dev/null
+++ b/leader-common/src/test/resources/maki.yml
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+job-config: env/{env}/job.yml, env/{env}/passwords.yml, env/{env}/spark.yml
+job-name:    amaterasu-test
+flow:
+    - name: start
+      runner:
+          group: spark
+          type: scala
+      exports:
+          odd: parquet
+    - name: step2
+      config: src/{action_name}/{env}/
+      runner:
+          group: spark
+          type: scala
+      file: file2.scala
+...
diff --git a/leader-common/src/test/resources/spark_repo/env/test/job.yml b/leader-common/src/test/resources/spark_repo/env/test/job.yml
new file mode 100644
index 0000000..1f75ec6
--- /dev/null
+++ b/leader-common/src/test/resources/spark_repo/env/test/job.yml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: test
+master: yarn
+inputRootPath: /apps/amaterasu/input
+outputRootPath: /apps/amaterasu/output
+workingDir: /apps/amaterasu/work_dir
+configuration:
+    spark.cassandra.connection.host: 127.0.0.1
+    sourceTable: documents
\ No newline at end of file
diff --git a/leader-common/src/test/resources/spark_repo/env/test/spark.yml b/leader-common/src/test/resources/spark_repo/env/test/spark.yml
new file mode 100644
index 0000000..85f1431
--- /dev/null
+++ b/leader-common/src/test/resources/spark_repo/env/test/spark.yml
@@ -0,0 +1,3 @@
+sparkConfiguration:
+    spark.executor.extraJavaOptions: -XX:+PrintGCDetails
+    spark.executor.memory: 1g
\ No newline at end of file
diff --git a/leader-common/src/test/resources/test_repo/env/test/job.yml b/leader-common/src/test/resources/test_repo/env/test/job.yml
new file mode 100644
index 0000000..1f75ec6
--- /dev/null
+++ b/leader-common/src/test/resources/test_repo/env/test/job.yml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: test
+master: yarn
+inputRootPath: /apps/amaterasu/input
+outputRootPath: /apps/amaterasu/output
+workingDir: /apps/amaterasu/work_dir
+configuration:
+    spark.cassandra.connection.host: 127.0.0.1
+    sourceTable: documents
\ No newline at end of file
diff --git a/leader-common/src/test/resources/test_repo/src/start/env/test/job.yml b/leader-common/src/test/resources/test_repo/src/start/env/test/job.yml
new file mode 100644
index 0000000..26cfeeb
--- /dev/null
+++ b/leader-common/src/test/resources/test_repo/src/start/env/test/job.yml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+master: mesos
+configuration:
+    name: yaniv
\ No newline at end of file
diff --git a/leader-common/src/test/resources/test_repo/src/step2/test/job.yml b/leader-common/src/test/resources/test_repo/src/step2/test/job.yml
new file mode 100644
index 0000000..9203514
--- /dev/null
+++ b/leader-common/src/test/resources/test_repo/src/step2/test/job.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: test2
\ No newline at end of file
diff --git a/leader/build.gradle b/leader/build.gradle
index a0de6f5..114bbd3 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -19,6 +19,7 @@ plugins {
     id 'com.github.maiflai.scalatest' version '0.22'
     id 'scala'
     id 'java'
+
 }
 
 sourceCompatibility = 1.8
@@ -42,19 +43,18 @@ dependencies {
     compile group: 'com.github.scopt', name: 'scopt_2.11', version: '3.3.0'
     compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
     compile group: 'org.apache.curator', name:'curator-test', version:'2.9.1'
-    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
+    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.4'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.4'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.4'
+    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.4'
     compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.2.19.v20160908'
     compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0'
-    compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r'
-    compile group: 'org.yaml', name: 'snakeyaml', version: '1.18'
+    compile group: 'org.yaml', name: 'snakeyaml', version: '1.23'
     compile group: 'commons-cli', name: 'commons-cli', version: '1.2'
     compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
     compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6'
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala
index 73b9dc5..b2492bb 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala
@@ -14,31 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.dsl
-
-import java.io.File
-
-import org.eclipse.jgit.api.Git
-
-import scala.reflect.io.Path
+//package org.apache.amaterasu.leader.dsl
+//
+//import java.io.File
+//
+//import org.eclipse.jgit.api.Git
+//
+//import scala.reflect.io.Path
 
 /**
   * The GitUtil class handles getting the job git repository
   */
-object GitUtil {
-
-  def cloneRepo(repoAddress: String, branch: String) = {
-
-    val path = Path("repo")
-    path.deleteRecursively()
-
-    //TODO: add authentication
-    Git.cloneRepository
-      .setURI(repoAddress)
-      .setDirectory(new File("repo"))
-      .setBranch(branch)
-      .call
-
-  }
-
-}
\ No newline at end of file
+//object GitUtil {
+//
+//  def cloneRepo(repoAddress: String, branch: String) = {
+//
+//    val path = Path("repo")
+//    path.deleteRecursively()
+//
+//    //TODO: add authentication
+//    val git = Git.cloneRepository
+//      .setURI(repoAddress)
+//      .setDirectory(new File("repo"))
+//      .setBranch(branch)
+//      .call
+//
+//    git.close()
+//  }
+//
+//}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala
index 80cc1bc..234070d 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala
@@ -21,7 +21,8 @@ import java.util.concurrent.BlockingQueue
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.dsl.{GitUtil, JobParser}
+import org.apache.amaterasu.leader.common.dsl.GitUtil
+import org.apache.amaterasu.leader.dsl.JobParser
 import org.apache.curator.framework.CuratorFramework
 import org.apache.zookeeper.CreateMode
 
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
index 3e1a67b..7630221 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
@@ -33,6 +33,7 @@ class FrameworkProvidersFactory {
   def getFramework(groupId: String): FrameworkSetupProvider = {
     providers(groupId)
   }
+
 }
 
 object FrameworkProvidersFactory extends Logging {
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index a6c8306..d68ae77 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -16,12 +16,14 @@
  */
 package org.apache.amaterasu.leader.mesos.schedulers
 
+import java.io.{File, PrintWriter, StringWriter}
 import java.util
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
 import java.util.{Collections, UUID}
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
@@ -29,6 +31,7 @@ import org.apache.amaterasu.common.configuration.enums.ActionStatus.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel
 import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType}
+import org.apache.amaterasu.leader.common.configuration.ConfigManager
 import org.apache.amaterasu.leader.common.utilities.DataLoader
 import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory
 import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
@@ -55,6 +58,8 @@ class JobScheduler extends AmaterasuScheduler {
   private val version = props.getProperty("version")
   println(s"===> version  $version")*/
   LogManager.resetConfiguration()
+  private var frameworkFactory: FrameworkProvidersFactory = _
+  private var configManager: ConfigManager = _
   private var jobManager: JobManager = _
   private var client: CuratorFramework = _
   private var config: ClusterConfig = _
@@ -78,6 +83,9 @@ class JobScheduler extends AmaterasuScheduler {
   private val mapper = new ObjectMapper()
   mapper.registerModule(DefaultScalaModule)
 
+  private val yamlMapper = new ObjectMapper(new YAMLFactory())
+  yamlMapper.registerModule(DefaultScalaModule)
+
   def error(driver: SchedulerDriver, message: String) {}
 
   def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {}
@@ -130,6 +138,7 @@ class JobScheduler extends AmaterasuScheduler {
   }
 
   def resourceOffers(driver: SchedulerDriver, offers: util.List[Offer]): Unit = {
+
     for (offer <- offers.asScala) {
 
       if (validateOffer(offer)) {
@@ -145,6 +154,18 @@ class JobScheduler extends AmaterasuScheduler {
           if (actionData != null) {
             val taskId = Protos.TaskID.newBuilder().setValue(actionData.id).build()
 
+            // setting up the configuration files for the container
+            val envYaml = configManager.getActionConfigContent(actionData.name, "") //TODO: replace with the value in actionData.config
+            writeConfigFile(envYaml, jobManager.jobId, actionData.name, "env.yaml")
+
+            val dataStores = DataLoader.getTaskData(actionData, env).exports
+            val writer = new StringWriter()
+            yamlMapper.writeValue(writer, dataStores)
+            val dataStoresYaml = writer.toString
+            writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name, "datastores.yaml")
+
+            writeConfigFile(s"jobId: ${jobManager.jobId}\nactionName: ${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml")
+
             offersToTaskIds.put(offer.getId.getValue, taskId.getValue)
 
             // atomically adding a record for the slave, I'm storing all the actions
@@ -154,7 +175,7 @@ class JobScheduler extends AmaterasuScheduler {
             val slaveActions = executionMap(offer.getSlaveId.toString)
             slaveActions.put(taskId.getValue, ActionStatus.started)
 
-            val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+
             val frameworkProvider = frameworkFactory.providers(actionData.groupId)
             val runnerProvider = frameworkProvider.getRunnerProvider(actionData.typeId)
 
@@ -182,6 +203,27 @@ class JobScheduler extends AmaterasuScheduler {
                     .setExtract(false)
                     .build())
 
+                // Getting env.yaml
+                command.addUris(URI.newBuilder
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml")
+                  .setExecutable(false)
+                  .setExtract(true)
+                  .build())
+
+                // Getting datastores.yaml
+                command.addUris(URI.newBuilder
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml")
+                  .setExecutable(false)
+                  .setExtract(true)
+                  .build())
+
+                // Getting runtime.yaml
+                command.addUris(URI.newBuilder
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml")
+                  .setExecutable(false)
+                  .setExtract(true)
+                  .build())
+
                 // Getting framework resources
                 frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
                   .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}")
@@ -198,7 +240,7 @@ class JobScheduler extends AmaterasuScheduler {
 
                 command
                   .addUris(URI.newBuilder()
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh")
+                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side
                     .setExecutable(true)
                     .setExtract(false)
                     .build())
@@ -207,6 +249,7 @@ class JobScheduler extends AmaterasuScheduler {
                     .setExecutable(false)
                     .setExtract(false)
                     .build())
+
                 executor = ExecutorInfo
                   .newBuilder
                   .setData(ByteString.copyFrom(execData))
@@ -239,6 +282,9 @@ class JobScheduler extends AmaterasuScheduler {
           else if (jobManager.outOfActions) {
             log.info(s"framework ${jobManager.jobId} execution finished")
 
+            val repo = new File("repo/")
+            repo.delete()
+
             HttpServer.stop()
             driver.declineOffer(offer.getId)
             driver.stop()
@@ -284,8 +330,15 @@ class JobScheduler extends AmaterasuScheduler {
       )
 
     }
+
+    frameworkFactory = FrameworkProvidersFactory(env, config)
+    val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
+    configManager = new ConfigManager(env, "repo", items)
+
     jobManager.start()
 
+    createJobDir(jobManager.jobId)
+
   }
 
   def reregistered(driver: SchedulerDriver, masterInfo: Protos.MasterInfo) {}
@@ -310,6 +363,42 @@ class JobScheduler extends AmaterasuScheduler {
     }
 
   }
+
+  private def createJobDir(jobId: String): Unit = {
+    val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
+    val amaHome = new File(jarFile.getParent).getParent
+    val jobDir = s"$amaHome/dist/$jobId/"
+
+    val dir = new File(jobDir)
+    if (!dir.exists()) {
+      dir.mkdir()
+    }
+  }
+
+  /**
+    * This function creates an action specific env.yml file int the dist folder with the following path:
+    * dist/{jobId}/{actionName}/env.yml to be added to the container
+    *
+    * @param configuration A YAML string to be written to the env file
+    * @param jobId         the jobId
+    * @param actionName    the name of the action
+    */
+  def writeConfigFile(configuration: String, jobId: String, actionName: String, fileName: String): Unit = {
+    val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
+    val amaHome = new File(jarFile.getParent).getParent
+    val envLocation = s"$amaHome/dist/$jobId/$actionName/"
+
+    val dir = new File(envLocation)
+    if (!dir.exists()) {
+      dir.mkdir()
+    }
+
+
+    new PrintWriter(s"$envLocation/$fileName") {
+      write(configuration)
+      close
+    }
+  }
 }
 
 object JobScheduler {
@@ -343,6 +432,7 @@ object JobScheduler {
     scheduler.client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
     scheduler.client.start()
     scheduler.config = config
+
     scheduler
 
   }
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index e28d99f..1bbaa15 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -89,6 +89,8 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
     val stat = fs.getFileStatus(path)
     val fileResource = Records.newRecord(classOf[LocalResource])
 
+    fileResource.setShouldBeUploadedToSharedCache(true)
+    fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
     fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path))
     fileResource.setSize(stat.getLen)
     fileResource.setTimestamp(stat.getModificationTime)
diff --git a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala b/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
index cef7cb0..feccfcd 100755
--- a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
+++ b/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.amaterasu.integration
 
-import org.apache.amaterasu.leader.dsl.GitUtil
+import org.apache.amaterasu.leader.common.dsl.GitUtil
 import org.scalatest.{FlatSpec, Matchers}
 
 import scala.reflect.io.Path
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
index 07a28b1..b676c1c 100644
--- a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
@@ -33,4 +33,6 @@
 
     RunnerSetupProvider getRunnerProvider(String runnerId);
 
+    String[] getConfigurationItems();
+
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services