You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@amaterasu.apache.org by GitBox <gi...@apache.org> on 2018/10/13 06:00:18 UTC
[GitHub] nadav-har-tzvi closed pull request #34: Amaterasu 49 - mesos
implementation
nadav-har-tzvi closed pull request #34: Amaterasu 49 - mesos implementation
URL: https://github.com/apache/incubator-amaterasu/pull/34
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/build.gradle b/build.gradle
index 94b5dad..06a07e8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -19,20 +19,25 @@ plugins {
}
apply plugin: 'distribution'
+apply plugin: 'project-report'
+
+htmlDependencyReport {
+ projects = project.allprojects
+}
allprojects {
group 'org.apache.amaterasu'
version '0.2.0-incubating-rc4'
}
-task copyLeagalFiles(type: Copy) {
+task copyLegalFiles(type: Copy) {
from "./DISCLAIMER", "./LICENSE", "./NOTICE"
into "${buildDir}/amaterasu"
}
task buildHomeDir() {
dependsOn subprojects.collect { getTasksByName('copyToHome', true) }
- dependsOn copyLeagalFiles
+ dependsOn copyLegalFiles
}
distributions {
diff --git a/frameworks/spark/dispatcher/build.gradle b/frameworks/spark/dispatcher/build.gradle
index a95d958..a2bf9fe 100644
--- a/frameworks/spark/dispatcher/build.gradle
+++ b/frameworks/spark/dispatcher/build.gradle
@@ -39,6 +39,7 @@ dependencies {
compile project(':common')
compile project(':leader-common')
compile project(':amaterasu-sdk')
+ compile 'com.uchuhimo:konf:0.11'
}
task copyToHomeBin(type: Copy) {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
index ac442d5..79bd080 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
@@ -103,4 +103,6 @@ class SparkSetupProvider extends FrameworkSetupProvider {
override def getRunnerProvider(runnerId: String): RunnerSetupProvider = {
runnerProviders(runnerId)
}
+
+ override def getConfigurationItems = Array("sparkConfiguration", "sparkExecutor")
}
\ No newline at end of file
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 6ffa237..1948b90 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 38185b0..a95009c 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,5 @@
-#Sun Jun 10 14:41:17 AEST 2018
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-bin.zip
diff --git a/gradlew b/gradlew
index 3efb0e9..cccdd3d 100755
--- a/gradlew
+++ b/gradlew
@@ -1,20 +1,4 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
+#!/usr/bin/env sh
##############################################################################
##
@@ -49,11 +33,11 @@ DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
-warn ( ) {
+warn () {
echo "$*"
}
-die ( ) {
+die () {
echo
echo "$*"
echo
@@ -170,16 +154,19 @@ if $cygwin ; then
esac
fi
-# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
-function splitJvmOpts() {
- JVM_OPTS=("$@")
+# Escape application args
+save () {
+ for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
+ echo " "
}
-eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
-JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
+APP_ARGS=$(save "$@")
+
+# Collect all arguments for the java command, following the shell quoting and substitution rules
+eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
-if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then
+if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
cd "$(dirname "$0")"
fi
-exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
+exec "$JAVACMD" "$@"
diff --git a/gradlew.bat b/gradlew.bat
index f664a3f..e95643d 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -1,21 +1,3 @@
-rem
-rem
-rem Licensed to the Apache Software Foundation (ASF) under one or more
-rem contributor license agreements. See the NOTICE file distributed with
-rem this work for additional information regarding copyright ownership.
-rem The ASF licenses this file to You under the Apache License, Version 2.0
-rem (the "License"); you may not use this file except in compliance with
-rem the License. You may obtain a copy of the License at
-rem
-rem http://www.apache.org/licenses/LICENSE-2.0
-rem
-rem Unless required by applicable law or agreed to in writing, software
-rem distributed under the License is distributed on an "AS IS" BASIS,
-rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-rem See the License for the specific language governing permissions and
-rem limitations under the License.
-rem
-
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 3f3ac98..61b9309 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -14,21 +14,53 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+buildscript {
+ ext.kotlin_version = '1.2.60'
+
+ repositories {
+ mavenCentral()
+ maven {
+ url 'http://repository.jetbrains.com/all'
+ }
+ maven {
+ url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots"
+ }
+ }
+
+ dependencies {
+ classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+ classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0'
+ }
+}
+
plugins {
- id "com.github.johnrengelman.shadow" version "1.2.4"
- id 'com.github.maiflai.scalatest' version '0.22'
+ id "com.github.johnrengelman.shadow" version "2.0.4"
id 'scala'
- id 'java'
+}
+
+apply plugin: 'kotlin'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+junitPlatform {
+ filters {
+ engines {
+ include 'spek'
+ }
+ }
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
- maven {
- url "https://plugins.gradle.org/m2/"
- }
+ maven { url "https://plugins.gradle.org/m2/" }
+ maven { url 'http://repository.jetbrains.com/all' }
+ maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" }
+ maven { url "http://dl.bintray.com/jetbrains/spek" }
+ maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" }
+
mavenCentral()
+ jcenter()
}
dependencies {
@@ -41,4 +73,43 @@ dependencies {
compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
-}
\ No newline at end of file
+
+ compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r'
+
+ compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+ compile "org.jetbrains.kotlin:kotlin-reflect"
+ compile('com.uchuhimo:konf:0.11') {
+ exclude group: 'org.eclipse.jgit'
+ }
+
+ testCompile 'org.jetbrains.spek:spek-api:1.1.5'
+ testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
+ testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
+
+ // spek requires kotlin-reflect, can be omitted if already in the classpath
+ testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
+
+}
+
+sourceSets {
+ test {
+ resources.srcDirs += [file('src/test/resources')]
+ }
+
+}
+
+compileKotlin {
+ kotlinOptions.jvmTarget = "1.8"
+}
+compileTestKotlin {
+ kotlinOptions.jvmTarget = "1.8"
+}
+//
+//kotlin {
+// experimental {
+// coroutines 'enable'
+// }
+//}
+
+//task copyToHome() {
+//}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
new file mode 100644
index 0000000..2f98fa6
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.configuration
+
+import com.uchuhimo.konf.Config
+import com.uchuhimo.konf.source.yaml.toYaml
+import java.io.File
+
+class ConfigManager(private val env: String, private val repoPath: String, private val frameworkItems: List<String> = emptyList()) {
+
+ private val envFolder = "$repoPath/env/$env"
+
+ // this is currently public for testing reasons, need to reconsider
+ var config: Config = Config {
+ addSpec(Job)
+ for (item in frameworkItems) {
+ val frameworkSpec = GenericSpec(item)
+ addSpec(frameworkSpec.spec)
+ }
+ }
+
+ init {
+ for (file in File(envFolder).listFiles()) {
+ config = config.from.yaml.file(file)
+ println(config.toYaml.toText())
+ }
+ }
+
+ fun getActionConfiguration(action: String, path: String = ""): Config {
+
+ val actionPath = if (path.isEmpty()) {
+ "$repoPath/src/$action/env/$env"
+ } else {
+ "$repoPath/$path"
+ .replace("{env}", env)
+ .replace("{action_name}", action)
+ }
+
+ var result = config
+
+ val configLocation = File(actionPath)
+ if (configLocation.exists()) {
+ if (configLocation.isDirectory) {
+ for (file in File(actionPath).listFiles()) {
+ result = config.from.yaml.file(file)
+ }
+ } else {
+ result = config.from.yaml.file(configLocation)
+ }
+ }
+ return result
+ }
+
+ fun getActionConfigContent(action: String, path: String = ""): String {
+ return getActionConfiguration(action, path).toYaml.toText()
+ }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
new file mode 100644
index 0000000..ed6fa9e
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.configuration
+
+import com.uchuhimo.konf.ConfigSpec
+import com.uchuhimo.konf.OptionalItem
+
+class GenericSpec(configurationItem: String) {
+ val spec = ConfigSpec()
+ val items = OptionalItem(spec, configurationItem, emptyMap<String, String>())
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt
new file mode 100644
index 0000000..076b0af
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.configuration
+
+import com.uchuhimo.konf.ConfigSpec
+
+object Job : ConfigSpec("") {
+ val name by required<String>()
+ val master by required<String>()
+ val inputRootPath by required<String>()
+ val outputRootPath by required<String>()
+ val workingDir by required<String>()
+ val configuration by optional(emptyMap<String, String>())
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtil.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtil.kt
new file mode 100644
index 0000000..d479e5f
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtil.kt
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.eclipse.jgit.api.Git
+import java.io.File
+
+object GitUtil {
+ @JvmStatic
+ fun cloneRepo(repoAddress: String, branch: String) {
+ Git.cloneRepository().apply {
+ setURI(repoAddress)
+ setDirectory(File("repo"))
+ setBranch(branch)
+ }.call().close()
+ }
+}
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
new file mode 100644
index 0000000..1aa729b
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.configuration
+
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.*
+import kotlin.test.assertEquals
+import java.io.File
+
+object ConfigManagerTests : Spek({
+
+ val marker = this.javaClass.getResource("/maki.yml").path
+
+ given("a ConfigManager for a job ") {
+
+ val repoPath = "${File(marker).parent}/test_repo"
+ val cfg = ConfigManager("test", repoPath)
+
+ it("loads the job level environment"){
+ assertEquals(cfg.config[Job.master] , "yarn")
+ }
+
+ on("getting an env for an action with default path") {
+ val startConf = cfg.getActionConfiguration("start")
+ it("loads the specific configuration defined in the actions folder"){
+ assertEquals(startConf[Job.master] , "mesos")
+ }
+ }
+
+ on("getting an env for an action with a conf: property in the maki.yml"){
+ val step2conf = cfg.getActionConfiguration("step2", "src/{action_name}/{env}/")
+
+ it("loads the specific configuration defined in the actions folder"){
+ assertEquals(step2conf[Job.name] , "test2")
+ }
+ }
+
+ on("getting an env for an action with no action level config"){
+ val step3conf = cfg.getActionConfiguration("step3")
+
+ it("loads only the job level conf"){
+ assertEquals(step3conf[Job.name] , "test")
+ }
+ }
+
+ on("receiving a path to a specific file" ){
+ val step4conf = cfg.getActionConfiguration("step4", "src/start/env/{env}/job.yml")
+
+ it("loads the specific configuration from the file"){
+ assertEquals(step4conf[Job.master] , "mesos")
+ }
+ }
+
+
+ }
+
+ given("a ConfigManager for a job with spark framework") {
+
+ val repoPath = "${File(marker).parent}/spark_repo"
+ val cfg = ConfigManager("test", repoPath, listOf("sparkConfiguration"))
+
+ it("load the framework configuration for spark"){
+ val spark: Map<String, String> = cfg.config["sparkConfiguration"]
+ assertEquals(spark["spark.executor.memory"], "1g")
+ }
+ }
+})
\ No newline at end of file
diff --git a/leader-common/src/test/resources/maki.yml b/leader-common/src/test/resources/maki.yml
new file mode 100644
index 0000000..576b7dd
--- /dev/null
+++ b/leader-common/src/test/resources/maki.yml
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+job-config: env/{env}/job.yml, env/{env}/passwords.yml, env/{env}/spark.yml
+job-name: amaterasu-test
+flow:
+ - name: start
+ runner:
+ group: spark
+ type: scala
+ exports:
+ odd: parquet
+ - name: step2
+ config: src/{action_name}/{env}/
+ runner:
+ group: spark
+ type: scala
+ file: file2.scala
+...
diff --git a/leader-common/src/test/resources/spark_repo/env/test/job.yml b/leader-common/src/test/resources/spark_repo/env/test/job.yml
new file mode 100644
index 0000000..1f75ec6
--- /dev/null
+++ b/leader-common/src/test/resources/spark_repo/env/test/job.yml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: test
+master: yarn
+inputRootPath: /apps/amaterasu/input
+outputRootPath: /apps/amaterasu/output
+workingDir: /apps/amaterasu/work_dir
+configuration:
+ spark.cassandra.connection.host: 127.0.0.1
+ sourceTable: documents
\ No newline at end of file
diff --git a/leader-common/src/test/resources/spark_repo/env/test/spark.yml b/leader-common/src/test/resources/spark_repo/env/test/spark.yml
new file mode 100644
index 0000000..85f1431
--- /dev/null
+++ b/leader-common/src/test/resources/spark_repo/env/test/spark.yml
@@ -0,0 +1,3 @@
+sparkConfiguration:
+ spark.executor.extraJavaOptions: -XX:+PrintGCDetails
+ spark.executor.memory: 1g
\ No newline at end of file
diff --git a/leader-common/src/test/resources/test_repo/env/test/job.yml b/leader-common/src/test/resources/test_repo/env/test/job.yml
new file mode 100644
index 0000000..1f75ec6
--- /dev/null
+++ b/leader-common/src/test/resources/test_repo/env/test/job.yml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: test
+master: yarn
+inputRootPath: /apps/amaterasu/input
+outputRootPath: /apps/amaterasu/output
+workingDir: /apps/amaterasu/work_dir
+configuration:
+ spark.cassandra.connection.host: 127.0.0.1
+ sourceTable: documents
\ No newline at end of file
diff --git a/leader-common/src/test/resources/test_repo/src/start/env/test/job.yml b/leader-common/src/test/resources/test_repo/src/start/env/test/job.yml
new file mode 100644
index 0000000..26cfeeb
--- /dev/null
+++ b/leader-common/src/test/resources/test_repo/src/start/env/test/job.yml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+master: mesos
+configuration:
+ name: yaniv
\ No newline at end of file
diff --git a/leader-common/src/test/resources/test_repo/src/step2/test/job.yml b/leader-common/src/test/resources/test_repo/src/step2/test/job.yml
new file mode 100644
index 0000000..9203514
--- /dev/null
+++ b/leader-common/src/test/resources/test_repo/src/step2/test/job.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: test2
\ No newline at end of file
diff --git a/leader/build.gradle b/leader/build.gradle
index a0de6f5..114bbd3 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -19,6 +19,7 @@ plugins {
id 'com.github.maiflai.scalatest' version '0.22'
id 'scala'
id 'java'
+
}
sourceCompatibility = 1.8
@@ -42,19 +43,18 @@ dependencies {
compile group: 'com.github.scopt', name: 'scopt_2.11', version: '3.3.0'
compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
compile group: 'org.apache.curator', name:'curator-test', version:'2.9.1'
- compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
- compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
+ compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.4'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.4'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.4'
+ compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.4'
compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.2.19.v20160908'
compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.2.19.v20160908'
compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.2.19.v20160908'
compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.2.19.v20160908'
compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.2.19.v20160908'
compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0'
- compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r'
- compile group: 'org.yaml', name: 'snakeyaml', version: '1.18'
+ compile group: 'org.yaml', name: 'snakeyaml', version: '1.23'
compile group: 'commons-cli', name: 'commons-cli', version: '1.2'
compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6'
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala
index 73b9dc5..b2492bb 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/GitUtil.scala
@@ -14,31 +14,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.leader.dsl
-
-import java.io.File
-
-import org.eclipse.jgit.api.Git
-
-import scala.reflect.io.Path
+//package org.apache.amaterasu.leader.dsl
+//
+//import java.io.File
+//
+//import org.eclipse.jgit.api.Git
+//
+//import scala.reflect.io.Path
/**
* The GitUtil class handles getting the job git repository
*/
-object GitUtil {
-
- def cloneRepo(repoAddress: String, branch: String) = {
-
- val path = Path("repo")
- path.deleteRecursively()
-
- //TODO: add authentication
- Git.cloneRepository
- .setURI(repoAddress)
- .setDirectory(new File("repo"))
- .setBranch(branch)
- .call
-
- }
-
-}
\ No newline at end of file
+//object GitUtil {
+//
+// def cloneRepo(repoAddress: String, branch: String) = {
+//
+// val path = Path("repo")
+// path.deleteRecursively()
+//
+// //TODO: add authentication
+// val git = Git.cloneRepository
+// .setURI(repoAddress)
+// .setDirectory(new File("repo"))
+// .setBranch(branch)
+// .call
+//
+// git.close()
+// }
+//
+//}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala
index 80cc1bc..234070d 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala
@@ -21,7 +21,8 @@ import java.util.concurrent.BlockingQueue
import org.apache.amaterasu.common.configuration.enums.ActionStatus
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.dsl.{GitUtil, JobParser}
+import org.apache.amaterasu.leader.common.dsl.GitUtil
+import org.apache.amaterasu.leader.dsl.JobParser
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
index 3e1a67b..7630221 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala
@@ -33,6 +33,7 @@ class FrameworkProvidersFactory {
def getFramework(groupId: String): FrameworkSetupProvider = {
providers(groupId)
}
+
}
object FrameworkProvidersFactory extends Logging {
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index a6c8306..d68ae77 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -16,12 +16,14 @@
*/
package org.apache.amaterasu.leader.mesos.schedulers
+import java.io.{File, PrintWriter, StringWriter}
import java.util
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import java.util.{Collections, UUID}
import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.configuration.enums.ActionStatus
@@ -29,6 +31,7 @@ import org.apache.amaterasu.common.configuration.enums.ActionStatus.ActionStatus
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel
import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType}
+import org.apache.amaterasu.leader.common.configuration.ConfigManager
import org.apache.amaterasu.leader.common.utilities.DataLoader
import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory
import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
@@ -55,6 +58,8 @@ class JobScheduler extends AmaterasuScheduler {
private val version = props.getProperty("version")
println(s"===> version $version")*/
LogManager.resetConfiguration()
+ private var frameworkFactory: FrameworkProvidersFactory = _
+ private var configManager: ConfigManager = _
private var jobManager: JobManager = _
private var client: CuratorFramework = _
private var config: ClusterConfig = _
@@ -78,6 +83,9 @@ class JobScheduler extends AmaterasuScheduler {
private val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
+ private val yamlMapper = new ObjectMapper(new YAMLFactory())
+ yamlMapper.registerModule(DefaultScalaModule)
+
def error(driver: SchedulerDriver, message: String) {}
def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {}
@@ -130,6 +138,7 @@ class JobScheduler extends AmaterasuScheduler {
}
def resourceOffers(driver: SchedulerDriver, offers: util.List[Offer]): Unit = {
+
for (offer <- offers.asScala) {
if (validateOffer(offer)) {
@@ -145,6 +154,18 @@ class JobScheduler extends AmaterasuScheduler {
if (actionData != null) {
val taskId = Protos.TaskID.newBuilder().setValue(actionData.id).build()
+ // setting up the configuration files for the container
+ val envYaml = configManager.getActionConfigContent(actionData.name, "") //TODO: replace with the value in actionData.config
+ writeConfigFile(envYaml, jobManager.jobId, actionData.name, "env.yaml")
+
+ val dataStores = DataLoader.getTaskData(actionData, env).exports
+ val writer = new StringWriter()
+ yamlMapper.writeValue(writer, dataStores)
+ val dataStoresYaml = writer.toString
+ writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name, "datastores.yaml")
+
+ writeConfigFile(s"jobId: ${jobManager.jobId}\nactionName: ${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml")
+
offersToTaskIds.put(offer.getId.getValue, taskId.getValue)
// atomically adding a record for the slave, I'm storing all the actions
@@ -154,7 +175,7 @@ class JobScheduler extends AmaterasuScheduler {
val slaveActions = executionMap(offer.getSlaveId.toString)
slaveActions.put(taskId.getValue, ActionStatus.started)
- val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+
val frameworkProvider = frameworkFactory.providers(actionData.groupId)
val runnerProvider = frameworkProvider.getRunnerProvider(actionData.typeId)
@@ -182,6 +203,27 @@ class JobScheduler extends AmaterasuScheduler {
.setExtract(false)
.build())
+ // Getting env.yaml
+ command.addUris(URI.newBuilder
+ .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml")
+ .setExecutable(false)
+ .setExtract(true)
+ .build())
+
+ // Getting datastores.yaml
+ command.addUris(URI.newBuilder
+ .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml")
+ .setExecutable(false)
+ .setExtract(true)
+ .build())
+
+ // Getting runtime.yaml
+ command.addUris(URI.newBuilder
+ .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml")
+ .setExecutable(false)
+ .setExtract(true)
+ .build())
+
// Getting framework resources
frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
.setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}")
@@ -198,7 +240,7 @@ class JobScheduler extends AmaterasuScheduler {
command
.addUris(URI.newBuilder()
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh")
+ .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side
.setExecutable(true)
.setExtract(false)
.build())
@@ -207,6 +249,7 @@ class JobScheduler extends AmaterasuScheduler {
.setExecutable(false)
.setExtract(false)
.build())
+
executor = ExecutorInfo
.newBuilder
.setData(ByteString.copyFrom(execData))
@@ -239,6 +282,9 @@ class JobScheduler extends AmaterasuScheduler {
else if (jobManager.outOfActions) {
log.info(s"framework ${jobManager.jobId} execution finished")
+ val repo = new File("repo/")
+ repo.delete()
+
HttpServer.stop()
driver.declineOffer(offer.getId)
driver.stop()
@@ -284,8 +330,15 @@ class JobScheduler extends AmaterasuScheduler {
)
}
+
+ frameworkFactory = FrameworkProvidersFactory(env, config)
+ val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
+ configManager = new ConfigManager(env, "repo", items)
+
jobManager.start()
+ createJobDir(jobManager.jobId)
+
}
def reregistered(driver: SchedulerDriver, masterInfo: Protos.MasterInfo) {}
@@ -310,6 +363,42 @@ class JobScheduler extends AmaterasuScheduler {
}
}
+
+ private def createJobDir(jobId: String): Unit = {
+ val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
+ val amaHome = new File(jarFile.getParent).getParent
+ val jobDir = s"$amaHome/dist/$jobId/"
+
+ val dir = new File(jobDir)
+ if (!dir.exists()) {
+ dir.mkdir()
+ }
+ }
+
+ /**
+ * This function creates an action specific env.yml file int the dist folder with the following path:
+ * dist/{jobId}/{actionName}/env.yml to be added to the container
+ *
+ * @param configuration A YAML string to be written to the env file
+ * @param jobId the jobId
+ * @param actionName the name of the action
+ */
+ def writeConfigFile(configuration: String, jobId: String, actionName: String, fileName: String): Unit = {
+ val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
+ val amaHome = new File(jarFile.getParent).getParent
+ val envLocation = s"$amaHome/dist/$jobId/$actionName/"
+
+ val dir = new File(envLocation)
+ if (!dir.exists()) {
+ dir.mkdir()
+ }
+
+
+ new PrintWriter(s"$envLocation/$fileName") {
+ write(configuration)
+ close
+ }
+ }
}
object JobScheduler {
@@ -343,6 +432,7 @@ object JobScheduler {
scheduler.client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
scheduler.client.start()
scheduler.config = config
+
scheduler
}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index e28d99f..1bbaa15 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -89,6 +89,8 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
val stat = fs.getFileStatus(path)
val fileResource = Records.newRecord(classOf[LocalResource])
+ fileResource.setShouldBeUploadedToSharedCache(true)
+ fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path))
fileResource.setSize(stat.getLen)
fileResource.setTimestamp(stat.getModificationTime)
diff --git a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala b/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
index cef7cb0..feccfcd 100755
--- a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
+++ b/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
@@ -16,7 +16,7 @@
*/
package org.apache.amaterasu.integration
-import org.apache.amaterasu.leader.dsl.GitUtil
+import org.apache.amaterasu.leader.common.dsl.GitUtil
import org.scalatest.{FlatSpec, Matchers}
import scala.reflect.io.Path
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
index 07a28b1..b676c1c 100644
--- a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
@@ -33,4 +33,6 @@
RunnerSetupProvider getRunnerProvider(String runnerId);
+ String[] getConfigurationItems();
+
}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services