You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/22 22:06:17 UTC
[2/2] incubator-reef git commit: [REEF-30] Adding reef-runtime-mesos
[REEF-30] Adding reef-runtime-mesos
This adds reef-runtime-mesos. Please refer to the pull request for details:
https://github.com/apache/incubator-reef/pull/52
JIRA: [REEF-30]: https://issues.apache.org/jira/browse/REEF-30
Pull Request: This closes #52
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c908a526
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c908a526
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c908a526
Branch: refs/heads/master
Commit: c908a526d8ad14ab9a35be351a89160fe6576992
Parents: 4bc3282
Author: John Yang <jo...@gmail.com>
Authored: Mon Jan 12 16:15:53 2015 +0900
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Jan 22 13:00:18 2015 -0800
----------------------------------------------------------------------
.gitignore | 3 +-
bin/runmesostests.sh | 34 ++
pom.xml | 23 +
reef-examples/pom.xml | 5 +
.../reef/examples/hello/HelloREEFMesos.java | 55 ++
reef-runtime-mesos/pom.xml | 95 ++++
.../src/main/avro/EvaluatorControl.avsc | 47 ++
.../runtime/mesos/MesosClasspathProvider.java | 92 ++++
.../mesos/client/MesosClientConfiguration.java | 68 +++
.../mesos/client/MesosJobSubmissionHandler.java | 141 ++++++
.../mesos/client/parameters/MasterIp.java | 26 +
.../mesos/client/parameters/RootFolder.java | 26 +
.../mesos/driver/MesosDriverConfiguration.java | 98 ++++
.../driver/MesosResourceLaunchHandler.java | 129 +++++
.../driver/MesosResourceReleaseHandler.java | 42 ++
.../driver/MesosResourceRequestHandler.java | 42 ++
.../mesos/driver/MesosRuntimeStartHandler.java | 38 ++
.../mesos/driver/MesosRuntimeStopHandler.java | 38 ++
.../driver/MesosSchedulerDriverExecutor.java | 42 ++
.../runtime/mesos/driver/REEFEventHandlers.java | 65 +++
.../reef/runtime/mesos/driver/REEFExecutor.java | 50 ++
.../runtime/mesos/driver/REEFExecutors.java | 64 +++
.../runtime/mesos/driver/REEFScheduler.java | 506 +++++++++++++++++++
.../mesos/driver/parameters/MesosMasterIp.java | 26 +
.../evaluator/EvaluatorControlHandler.java | 55 ++
.../runtime/mesos/evaluator/REEFExecutor.java | 249 +++++++++
.../evaluator/parameters/MesosExecutorId.java | 26 +
.../util/HDFSConfigurationConstructor.java | 35 ++
.../runtime/mesos/util/MesosErrorHandler.java | 43 ++
.../runtime/mesos/util/MesosRemoteManager.java | 62 +++
.../mesos/util/MesosRemoteManagerCodec.java | 68 +++
reef-tests/pom.xml | 5 +
.../apache/reef/tests/MesosTestEnvironment.java | 68 +++
.../reef/tests/TestEnvironmentFactory.java | 16 +-
.../apache/reef/tests/YarnTestEnvironment.java | 2 +-
35 files changed, 2380 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index ea81d9e..2556de8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,4 +18,5 @@ nb-configuration.xml
.idea
atlassian-ide-plugin.xml
REEF_LOCAL_RUNTIME
-profile-*.json
\ No newline at end of file
+REEF_MESOS_RUNTIME
+profile-*.json
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/bin/runmesostests.sh
----------------------------------------------------------------------
diff --git a/bin/runmesostests.sh b/bin/runmesostests.sh
new file mode 100755
index 0000000..350995d
--- /dev/null
+++ b/bin/runmesostests.sh
@@ -0,0 +1,34 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if [ "$#" -ne 1 ]; then
+ echo "Please specify REEF_TEST_MESOS_MASTER_IP as an argument"
+ exit 1
+fi
+
+export REEF_TEST_MESOS=true
+export REEF_TEST_MESOS_MASTER_IP=$1
+
+DEPENDENCY_JAR=`echo $REEF_HOME/reef-tests/target/reef-tests-*-test-jar-with-dependencies.jar`
+CLASSPATH=`hadoop classpath`
+
+CMD="java -cp $DEPENDENCY_JAR:$CLASSPATH org.junit.runner.JUnitCore org.apache.reef.tests.AllTestsSuite"
+echo $CMD
+$CMD
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8ed7eed..cdbc73e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -357,6 +357,22 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jcl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
@@ -571,6 +587,12 @@ under the License.
<version>${jackson.version}</version>
</dependency>
<!-- End of Jackson -->
+
+ <dependency>
+ <groupId>org.apache.mesos</groupId>
+ <artifactId>mesos</artifactId>
+ <version>0.21.0</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -587,6 +609,7 @@ under the License.
<module>reef-runtime-hdinsight</module>
<module>reef-runtime-local</module>
<module>reef-runtime-yarn</module>
+ <module>reef-runtime-mesos</module>
<module>reef-tang</module>
<module>reef-tests</module>
<module>reef-wake</module>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-examples/pom.xml
----------------------------------------------------------------------
diff --git a/reef-examples/pom.xml b/reef-examples/pom.xml
index 66eeb34..b2eb3b7 100644
--- a/reef-examples/pom.xml
+++ b/reef-examples/pom.xml
@@ -48,6 +48,11 @@ under the License.
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-mesos</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>reef-io</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java
----------------------------------------------------------------------
diff --git a/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java
new file mode 100644
index 0000000..b3b962b
--- /dev/null
+++ b/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFMesos.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.mesos.client.MesosClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class HelloREEFMesos {
+ private static final Logger LOG = Logger.getLogger(HelloREEFMesos.class.getName());
+
+ private static Configuration getDriverConfiguration() {
+ return DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, HelloREEFMesos.class.getProtectionDomain().getCodeSource().getLocation().getFile())
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class)
+ .build();
+ }
+
+ /**
+ * MASTER_IP(Mesos Master IP) is set to "localhost:5050".
+ * You may change it to suit your cluster environment.
+ */
+ public static void main(final String[] args) throws InjectionException {
+ final LauncherStatus status = DriverLauncher
+ .getLauncher(MesosClientConfiguration.CONF
+ .set(MesosClientConfiguration.MASTER_IP, "localhost:5050")
+ .build())
+ .run(getDriverConfiguration());
+ LOG.log(Level.INFO, "REEF job completed: {0}", status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/pom.xml b/reef-runtime-mesos/pom.xml
new file mode 100644
index 0000000..a5aa333
--- /dev/null
+++ b/reef-runtime-mesos/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.11.0-incubating-SNAPSHOT</version>
+ </parent>
+ <name>REEF Runtime for Mesos</name>
+ <artifactId>reef-runtime-mesos</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mesos</groupId>
+ <artifactId>mesos</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+ <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/avro</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc b/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
new file mode 100644
index 0000000..ef4a152
--- /dev/null
+++ b/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+ {
+ "namespace": "org.apache.reef.runtime.mesos.util",
+ "type": "record",
+ "name": "EvaluatorLaunch",
+ "fields": [
+ { "name": "identifier", "type": "string" },
+ { "name": "command", "type": "string" }
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.runtime.mesos.util",
+ "type": "record",
+ "name": "EvaluatorRelease",
+ "fields": [
+ { "name": "identifier", "type": "string" }
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.runtime.mesos.util",
+ "type": "record",
+ "name": "EvaluatorControl",
+ "fields": [
+ { "name": "evaluator_launch", "type": ["EvaluatorLaunch", "null"] },
+ { "name": "evaluator_release", "type": ["EvaluatorRelease", "null"] }
+ ]
+ }
+]
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
new file mode 100644
index 0000000..cc051d5
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos;
+
+import net.jcip.annotations.Immutable;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * Access to the classpath according to the REEF file system standard.
+ */
+@Immutable
+public final class MesosClasspathProvider implements RuntimeClasspathProvider {
+ private static final String HADOOP_CONF_DIR = System.getenv("HADOOP_CONF_DIR");
+ private static final String HADOOP_HOME = System.getenv("HADOOP_HOME");
+ private static final String HADOOP_COMMON_HOME = System.getenv("HADOOP_COMMON_HOME");
+ private static final String HADOOP_YARN_HOME = System.getenv("HADOOP_YARN_HOME");
+ private static final String HADOOP_HDFS_HOME = System.getenv("HADOOP_HDFS_HOME");
+ private static final String HADOOP_MAPRED_HOME = System.getenv("HADOOP_MAPRED_HOME");
+
+ // Used when we can't get a classpath from Hadoop
+ private static final String[] LEGACY_CLASSPATH_LIST = new String[]{
+ HADOOP_CONF_DIR,
+ HADOOP_HOME + "/*",
+ HADOOP_HOME + "/lib/*",
+ HADOOP_COMMON_HOME + "/*",
+ HADOOP_COMMON_HOME + "/lib/*",
+ HADOOP_YARN_HOME + "/*",
+ HADOOP_YARN_HOME + "/lib/*",
+ HADOOP_HDFS_HOME + "/*",
+ HADOOP_HDFS_HOME + "/lib/*",
+ HADOOP_MAPRED_HOME + "/*",
+ HADOOP_MAPRED_HOME + "/lib/*",
+ HADOOP_HOME + "/etc/hadoop",
+ HADOOP_HOME + "/share/hadoop/common/*",
+ HADOOP_HOME + "/share/hadoop/common/lib/*",
+ HADOOP_HOME + "/share/hadoop/yarn/*",
+ HADOOP_HOME + "/share/hadoop/yarn/lib/*",
+ HADOOP_HOME + "/share/hadoop/hdfs/*",
+ HADOOP_HOME + "/share/hadoop/hdfs/lib/*",
+ HADOOP_HOME + "/share/hadoop/mapreduce/*",
+ HADOOP_HOME + "/share/hadoop/mapreduce/lib/*"
+ };
+ private final List<String> classPathPrefix;
+ private final List<String> classPathSuffix;
+
+ @Inject
+ MesosClasspathProvider() {
+ this.classPathPrefix = Arrays.asList(LEGACY_CLASSPATH_LIST);
+ this.classPathSuffix = Arrays.asList(LEGACY_CLASSPATH_LIST);
+ }
+
+ @Override
+ public List<String> getDriverClasspathPrefix() {
+ return this.classPathPrefix;
+ }
+
+ @Override
+ public List<String> getDriverClasspathSuffix() {
+ return this.classPathSuffix;
+ }
+
+ @Override
+ public List<String> getEvaluatorClasspathPrefix() {
+ return this.classPathPrefix;
+ }
+
+ @Override
+ public List<String> getEvaluatorClasspathSuffix() {
+ return this.classPathSuffix;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
new file mode 100644
index 0000000..280f2a6
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.client.REEF;
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.runtime.common.client.REEFImplementation;
+import org.apache.reef.runtime.common.client.RunningJobImpl;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.mesos.MesosClasspathProvider;
+import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
+import org.apache.reef.runtime.mesos.client.parameters.RootFolder;
+import org.apache.reef.runtime.mesos.util.HDFSConfigurationConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * A ConfigurationModule for the Mesos resource manager
+ */
+@Public
+@ClientSide
+public class MesosClientConfiguration extends ConfigurationModuleBuilder {
+ /**
+ * The folder in which the sub-folders for REEF drivers, one per job, will be created.
+ * If none is given, a folder "REEF_MESOS_RUNTIME" will be created in the local directory.
+ */
+ public static final OptionalParameter<String> ROOT_FOLDER = new OptionalParameter<>();
+
+ /**
+ * The ip address of Mesos Master
+ */
+ public static final RequiredParameter<String> MASTER_IP = new RequiredParameter<>();
+
+ public static final ConfigurationModule CONF = new MesosClientConfiguration()
+ .bindImplementation(REEF.class, REEFImplementation.class)
+ .bindImplementation(RunningJob.class, RunningJobImpl.class)
+ .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+ .bindImplementation(JobSubmissionHandler.class, MesosJobSubmissionHandler.class)
+ .bindNamedParameter(RootFolder.class, ROOT_FOLDER)
+ .bindNamedParameter(MasterIp.class, MASTER_IP)
+ .bindConstructor(Configuration.class, HDFSConfigurationConstructor.class)
+ .bindImplementation(RuntimeClasspathProvider.class, MesosClasspathProvider.class)
+ .build();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
new file mode 100644
index 0000000..d43a855
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos.FileResourceProto;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
+import org.apache.reef.runtime.mesos.client.parameters.RootFolder;
+import org.apache.reef.runtime.mesos.driver.MesosDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+/**
+ * The current implementation runs the driver as a local process, similar to reef-runtime-local.
+ * TODO: run the driver on a slave node in the cluster
+ */
+@Private
+@ClientSide
+final class MesosJobSubmissionHandler implements JobSubmissionHandler {
+ public static final String DRIVER_FOLDER_NAME = "driver";
+
+ private final ConfigurationSerializer configurationSerializer;
+ private final ClasspathProvider classpath;
+ private final REEFFileNames fileNames;
+ private final String rootFolderName;
+ private final String masterIp;
+ private final double jvmSlack;
+
+ @Inject
+ MesosJobSubmissionHandler(final @Parameter(RootFolder.class) String rootFolderName,
+ final @Parameter(MasterIp.class) String masterIp,
+ final ConfigurationSerializer configurationSerializer,
+ final REEFFileNames fileNames,
+ final ClasspathProvider classpath,
+ final @Parameter(JVMHeapSlack.class) double jvmSlack) {
+ this.rootFolderName = new File(rootFolderName).getAbsolutePath();
+ this.masterIp = masterIp;
+ this.configurationSerializer = configurationSerializer;
+ this.fileNames = fileNames;
+ this.classpath = classpath;
+ this.jvmSlack = jvmSlack;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+ try {
+ final File jobFolder = new File(new File(this.rootFolderName),
+ "/" + jobSubmissionProto.getIdentifier() + "-" + System.currentTimeMillis() + "/");
+
+ final File driverFolder = new File(jobFolder, DRIVER_FOLDER_NAME);
+ driverFolder.mkdirs();
+
+ final File reefFolder = new File(driverFolder, this.fileNames.getREEFFolderName());
+ reefFolder.mkdirs();
+
+ final File localFolder = new File(reefFolder, this.fileNames.getLocalFolderName());
+ localFolder.mkdirs();
+ for (final FileResourceProto file : jobSubmissionProto.getLocalFileList()) {
+ final Path src = new File(file.getPath()).toPath();
+ final Path dst = new File(driverFolder, this.fileNames.getLocalFolderPath() + "/" + file.getName()).toPath();
+ Files.copy(src, dst);
+ }
+
+ final File globalFolder = new File(reefFolder, this.fileNames.getGlobalFolderName());
+ globalFolder.mkdirs();
+ for (final FileResourceProto file : jobSubmissionProto.getGlobalFileList()) {
+ final Path src = new File(file.getPath()).toPath();
+ final Path dst = new File(driverFolder, this.fileNames.getGlobalFolderPath() + "/" + file.getName()).toPath();
+ Files.copy(src, dst);
+ }
+
+ final Configuration driverConfiguration =
+ Configurations.merge(MesosDriverConfiguration.CONF
+ .set(MesosDriverConfiguration.MESOS_MASTER_IP, this.masterIp)
+ .set(MesosDriverConfiguration.JOB_IDENTIFIER, jobSubmissionProto.getIdentifier())
+ .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionProto.getRemoteId())
+ .set(MesosDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+ .set(MesosDriverConfiguration.SCHEDULER_DRIVER_CAPACITY, 1) // must be 1 as there is 1 scheduler at the same time
+ .build(),
+ this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()));
+ final File runtimeConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
+ this.configurationSerializer.toFile(driverConfiguration, runtimeConfigurationFile);
+
+ final List<String> launchCommand = new JavaLaunchCommandBuilder()
+ .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
+ .setLaunchID(jobSubmissionProto.getIdentifier())
+ .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
+ .setClassPath(this.classpath.getDriverClasspath())
+ .setMemory(jobSubmissionProto.getDriverMemory())
+ .build();
+
+ final File errFile = new File(driverFolder, fileNames.getDriverStderrFileName());
+ final File outFile = new File(driverFolder, fileNames.getDriverStdoutFileName());
+
+ new ProcessBuilder()
+ .command(launchCommand)
+ .directory(driverFolder)
+ .redirectError(errFile)
+ .redirectOutput(outFile)
+ .start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
new file mode 100644
index 0000000..09c2882
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The ip address of Mesos Master", short_name = "master_ip")
+public final class MasterIp implements Name<String> {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
new file mode 100644
index 0000000..8cdc116
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The root folder where logs etc. will be stored.", default_value = "REEF_MESOS_RUNTIME")
+public final class RootFolder implements Name<String> {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
new file mode 100644
index 0000000..2ef844d
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.mesos.MesosClasspathProvider;
+import org.apache.reef.runtime.mesos.driver.parameters.MesosMasterIp;
+import org.apache.reef.runtime.mesos.util.HDFSConfigurationConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.StageConfiguration;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.time.Clock;
+
+/**
+ * Binds Driver's runtime event handlers
+ */
+public final class MesosDriverConfiguration extends ConfigurationModuleBuilder {
+ /**
+ * @see AbstractDriverRuntimeConfiguration.JobIdentifier.class
+ */
+ public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>();
+
+ /**
+ * @see AbstractDriverRuntimeConfiguration.EvaluatorTimeout
+ */
+ public static final OptionalParameter<Long> EVALUATOR_TIMEOUT = new OptionalParameter<>();
+
+ /**
+ * The ip address of Mesos Master
+ */
+ public static final RequiredParameter<String> MESOS_MASTER_IP = new RequiredParameter<>();
+
+ /**
+ * The client remote identifier.
+ */
+ public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>();
+
+ /**
+ * The fraction of the container memory NOT to use for the Java Heap.
+ */
+ public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
+
+ /**
+ * Capacity for runnning Mesos Scheduler Driver
+ */
+ public static final RequiredParameter<Integer> SCHEDULER_DRIVER_CAPACITY = new RequiredParameter<>();
+
+ public static ConfigurationModule CONF = new MesosDriverConfiguration()
+ .bindImplementation(ResourceLaunchHandler.class, MesosResourceLaunchHandler.class)
+ .bindImplementation(ResourceReleaseHandler.class, MesosResourceReleaseHandler.class)
+ .bindImplementation(ResourceRequestHandler.class, MesosResourceRequestHandler.class)
+ .bindSetEntry(Clock.RuntimeStartHandler.class, MesosRuntimeStartHandler.class)
+ .bindSetEntry(Clock.RuntimeStopHandler.class, MesosRuntimeStopHandler.class)
+ .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+
+ .bindNamedParameter(MesosMasterIp.class, MESOS_MASTER_IP)
+ .bindConstructor(Configuration.class, HDFSConfigurationConstructor.class)
+ .bindImplementation(RuntimeClasspathProvider.class, MesosClasspathProvider.class)
+
+ .bindNamedParameter(StageConfiguration.Capacity.class, SCHEDULER_DRIVER_CAPACITY)
+ .bindNamedParameter(StageConfiguration.StageHandler.class, MesosSchedulerDriverExecutor.class)
+ .bindImplementation(EStage.class, SingleThreadStage.class)
+
+ // Bind the fields bound in AbstractDriverRuntimeConfiguration
+ .bindNamedParameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class, JOB_IDENTIFIER)
+ .bindNamedParameter(AbstractDriverRuntimeConfiguration.EvaluatorTimeout.class, EVALUATOR_TIMEOUT)
+ .bindNamedParameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER)
+ .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+ .build();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
new file mode 100644
index 0000000..7ce98f8
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.JobJarMaker;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.LaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@DriverSide
+@Private
+final class MesosResourceLaunchHandler implements ResourceLaunchHandler {
+ private final ConfigurationSerializer configurationSerializer;
+ private final RemoteManager remoteManager;
+ private final REEFFileNames fileNames;
+ private final ClasspathProvider classpath;
+ private final double jvmHeapFactor;
+ private final REEFExecutors executors;
+ private static final Logger LOG = Logger.getLogger(MesosResourceLaunchHandler.class.getName());
+
+ @Inject
+ MesosResourceLaunchHandler(final ConfigurationSerializer configurationSerializer,
+ final RemoteManager remoteManager,
+ final REEFFileNames fileNames,
+ final REEFExecutors executors,
+ final ClasspathProvider classpath,
+ final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) {
+ this.configurationSerializer = configurationSerializer;
+ this.remoteManager = remoteManager;
+ this.fileNames = fileNames;
+ this.executors = executors;
+ this.classpath = classpath;
+ this.jvmHeapFactor = 1.0 - jvmHeapSlack;
+ }
+
+
+ @Override
+ public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) {
+ try {
+ LOG.log(Level.INFO, "resourceLaunchProto. {0}", resourceLaunchProto.toString());
+
+ final File localStagingFolder =
+ Files.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix()).toFile();
+
+ final Configuration evaluatorConfiguration = Tang.Factory.getTang()
+ .newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf()))
+ .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+ .build();
+
+ final File configurationFile = new File(
+ localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
+ this.configurationSerializer.toFile(evaluatorConfiguration, configurationFile);
+
+ JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder);
+
+ final FileSystem fileSystem = FileSystem.get(new org.apache.hadoop.conf.Configuration());
+ final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + resourceLaunchProto.getIdentifier() + "/");
+ FileUtil.copy(localStagingFolder, fileSystem, hdfsFolder, false, new org.apache.hadoop.conf.Configuration());
+
+ // TODO: Replace REEFExecutor with a simple launch command (we only need to launch REEFExecutor)
+ final LaunchCommandBuilder commandBuilder;
+ switch (resourceLaunchProto.getType()) {
+ case JVM:
+ commandBuilder = new JavaLaunchCommandBuilder().setClassPath(this.classpath.getEvaluatorClasspath());
+ break;
+ case CLR:
+ commandBuilder = new CLRLaunchCommandBuilder();
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported container type");
+ }
+
+ final List<String> command = commandBuilder
+ .setErrorHandlerRID(this.remoteManager.getMyIdentifier())
+ .setLaunchID(resourceLaunchProto.getIdentifier())
+ .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath())
+ .setMemory((int) (this.jvmHeapFactor * this.executors.getMemory(resourceLaunchProto.getIdentifier())))
+ .build();
+
+ this.executors.launchEvaluator(
+ new EvaluatorLaunch(resourceLaunchProto.getIdentifier(), StringUtils.join(command, ' ')));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
new file mode 100644
index 0000000..41c487e
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+
+import javax.inject.Inject;
+
+@DriverSide
+@Private
+final class MesosResourceReleaseHandler implements ResourceReleaseHandler {
+ private final REEFScheduler REEFScheduler;
+
+ @Inject
+ MesosResourceReleaseHandler(final REEFScheduler REEFScheduler) {
+ this.REEFScheduler = REEFScheduler;
+ }
+
+ @Override
+ public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto resourceReleaseProto) {
+ REEFScheduler.onResourceRelease(resourceReleaseProto);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
new file mode 100644
index 0000000..a9c1016
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+
+import javax.inject.Inject;
+
+@DriverSide
+@Private
+final class MesosResourceRequestHandler implements ResourceRequestHandler {
+ private final REEFScheduler REEFScheduler;
+
+ @Inject
+ MesosResourceRequestHandler(final REEFScheduler REEFScheduler) {
+ this.REEFScheduler = REEFScheduler;
+ }
+
+ @Override
+ public void onNext(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
+ REEFScheduler.onResourceRequest(resourceRequestProto);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
new file mode 100644
index 0000000..c29e780
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+import javax.inject.Inject;
+
+final class MesosRuntimeStartHandler implements EventHandler<RuntimeStart> {
+ private final REEFScheduler REEFScheduler;
+
+ @Inject
+ MesosRuntimeStartHandler(final REEFScheduler REEFScheduler) {
+ this.REEFScheduler = REEFScheduler;
+ }
+
+ @Override
+ public void onNext(final RuntimeStart runtimeStart){
+ this.REEFScheduler.onStart();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
new file mode 100644
index 0000000..3d3b86e
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+
+final class MesosRuntimeStopHandler implements EventHandler<RuntimeStop> {
+ private final REEFScheduler REEFScheduler;
+
+ @Inject
+ MesosRuntimeStopHandler(final REEFScheduler REEFScheduler) {
+ this.REEFScheduler = REEFScheduler;
+ }
+
+ @Override
+ public void onNext(final RuntimeStop runtimeStop) {
+ this.REEFScheduler.onStop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java
new file mode 100644
index 0000000..abde514
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class MesosSchedulerDriverExecutor implements EventHandler<SchedulerDriver> {
+ private static final Logger LOG = Logger.getLogger(MesosSchedulerDriverExecutor.class.getName());
+
+ @Inject
+ public MesosSchedulerDriverExecutor() {
+ }
+
+ @Override
+ public void onNext(final SchedulerDriver schedulerDriver) {
+ LOG.log(Level.INFO, "MesosMaster(SchedulerDriver) starting");
+ final Protos.Status status = schedulerDriver.run();
+ LOG.log(Level.INFO, "MesosMaster(SchedulerDriver) ended with status {0}", status);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
new file mode 100644
index 0000000..fd5cce2
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceStatusProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+@Private
+final class REEFEventHandlers {
+ private final EventHandler<ResourceAllocationProto> resourceAllocationEventHandler;
+ private final EventHandler<RuntimeStatusProto> runtimeStatusEventHandler;
+ private final EventHandler<NodeDescriptorProto> nodeDescriptorEventHandler;
+ private final EventHandler<ResourceStatusProto> resourceStatusHandlerEventHandler;
+
+ @Inject
+ REEFEventHandlers(final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<ResourceAllocationProto> resourceAllocationEventHandler,
+ final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<RuntimeStatusProto> runtimeStatusEventHandler,
+ final @Parameter(RuntimeParameters.NodeDescriptorHandler.class) EventHandler<NodeDescriptorProto> nodeDescriptorEventHandler,
+ final @Parameter(RuntimeParameters.ResourceStatusHandler.class) EventHandler<ResourceStatusProto> resourceStatusHandlerEventHandler) {
+ this.resourceAllocationEventHandler = resourceAllocationEventHandler;
+ this.runtimeStatusEventHandler = runtimeStatusEventHandler;
+ this.nodeDescriptorEventHandler = nodeDescriptorEventHandler;
+ this.resourceStatusHandlerEventHandler = resourceStatusHandlerEventHandler;
+ }
+
+ void onNodeDescriptor(final NodeDescriptorProto nodeDescriptorProto) {
+ this.nodeDescriptorEventHandler.onNext(nodeDescriptorProto);
+ }
+
+ void onRuntimeStatus(final RuntimeStatusProto runtimeStatusProto) {
+ this.runtimeStatusEventHandler.onNext(runtimeStatusProto);
+ }
+
+ void onResourceAllocation(final ResourceAllocationProto resourceAllocationProto) {
+ this.resourceAllocationEventHandler.onNext(resourceAllocationProto);
+ }
+
+ void onResourceStatus(final ResourceStatusProto resourceStatusProto) {
+ this.resourceStatusHandlerEventHandler.onNext(resourceStatusProto);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java
new file mode 100644
index 0000000..be6045a
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * The Driver's view of a REEFExecutor running in the cluster.
+ */
+final class REEFExecutor {
+ private final int memory;
+ private final EventHandler<EvaluatorControl> evaluatorControlHandler;
+
+ REEFExecutor(final int memory,
+ final EventHandler<EvaluatorControl> evaluatorControlHandler) {
+ this.memory = memory;
+ this.evaluatorControlHandler = evaluatorControlHandler;
+ }
+
+ public void launchEvaluator(final EvaluatorLaunch evaluatorLaunch) {
+ this.evaluatorControlHandler.onNext(new EvaluatorControl(evaluatorLaunch, null));
+ }
+
+ public void releaseEvaluator(final EvaluatorRelease evaluatorRelease) {
+ this.evaluatorControlHandler.onNext(new EvaluatorControl(null, evaluatorRelease));
+ }
+
+ public int getMemory() {
+ return this.memory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c908a526/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java
----------------------------------------------------------------------
diff --git a/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java
new file mode 100644
index 0000000..f70e3fe
--- /dev/null
+++ b/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The Driver's view of MesosExecutors running in the cluster.
+ */
+final class REEFExecutors {
+ private final Map<String, REEFExecutor> executors = new ConcurrentHashMap<>();
+
+ @Inject
+ REEFExecutors() {
+ }
+
+ public void add(final String id,
+ final int memory,
+ final EventHandler<EvaluatorControl> evaluatorControlHandler) {
+ executors.put(id, new REEFExecutor(memory, evaluatorControlHandler));
+ }
+
+ public void remove(final String id) {
+ this.executors.remove(id);
+ }
+
+ public Set<String> getExecutorIds() { return executors.keySet(); }
+
+ public int getMemory(final String id) {
+ return executors.get(id).getMemory();
+ }
+
+ public void launchEvaluator(final EvaluatorLaunch evaluatorLaunch) {
+ executors.get(evaluatorLaunch.getIdentifier().toString()).launchEvaluator(evaluatorLaunch);
+ }
+
+ public void releaseEvaluator(final EvaluatorRelease evaluatorRelease) {
+ executors.get(evaluatorRelease.getIdentifier().toString()).releaseEvaluator(evaluatorRelease);
+ }
+}
\ No newline at end of file