You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:53 UTC
[20/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
new file mode 100644
index 0000000..2ef844d
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.mesos.MesosClasspathProvider;
+import org.apache.reef.runtime.mesos.driver.parameters.MesosMasterIp;
+import org.apache.reef.runtime.mesos.util.HDFSConfigurationConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.StageConfiguration;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.time.Clock;
+
+/**
+ * Binds Driver's runtime event handlers
+ */
+public final class MesosDriverConfiguration extends ConfigurationModuleBuilder {
+ /**
+ * @see AbstractDriverRuntimeConfiguration.JobIdentifier.class
+ */
+ public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>();
+
+ /**
+ * @see AbstractDriverRuntimeConfiguration.EvaluatorTimeout
+ */
+ public static final OptionalParameter<Long> EVALUATOR_TIMEOUT = new OptionalParameter<>();
+
+ /**
+ * The ip address of Mesos Master
+ */
+ public static final RequiredParameter<String> MESOS_MASTER_IP = new RequiredParameter<>();
+
+ /**
+ * The client remote identifier.
+ */
+ public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>();
+
+ /**
+ * The fraction of the container memory NOT to use for the Java Heap.
+ */
+ public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
+
+ /**
+ * Capacity for runnning Mesos Scheduler Driver
+ */
+ public static final RequiredParameter<Integer> SCHEDULER_DRIVER_CAPACITY = new RequiredParameter<>();
+
+ public static ConfigurationModule CONF = new MesosDriverConfiguration()
+ .bindImplementation(ResourceLaunchHandler.class, MesosResourceLaunchHandler.class)
+ .bindImplementation(ResourceReleaseHandler.class, MesosResourceReleaseHandler.class)
+ .bindImplementation(ResourceRequestHandler.class, MesosResourceRequestHandler.class)
+ .bindSetEntry(Clock.RuntimeStartHandler.class, MesosRuntimeStartHandler.class)
+ .bindSetEntry(Clock.RuntimeStopHandler.class, MesosRuntimeStopHandler.class)
+ .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+
+ .bindNamedParameter(MesosMasterIp.class, MESOS_MASTER_IP)
+ .bindConstructor(Configuration.class, HDFSConfigurationConstructor.class)
+ .bindImplementation(RuntimeClasspathProvider.class, MesosClasspathProvider.class)
+
+ .bindNamedParameter(StageConfiguration.Capacity.class, SCHEDULER_DRIVER_CAPACITY)
+ .bindNamedParameter(StageConfiguration.StageHandler.class, MesosSchedulerDriverExecutor.class)
+ .bindImplementation(EStage.class, SingleThreadStage.class)
+
+ // Bind the fields bound in AbstractDriverRuntimeConfiguration
+ .bindNamedParameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class, JOB_IDENTIFIER)
+ .bindNamedParameter(AbstractDriverRuntimeConfiguration.EvaluatorTimeout.class, EVALUATOR_TIMEOUT)
+ .bindNamedParameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER)
+ .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+ .build();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
new file mode 100644
index 0000000..7ce98f8
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.JobJarMaker;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.LaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@DriverSide
+@Private
+final class MesosResourceLaunchHandler implements ResourceLaunchHandler {
+ private final ConfigurationSerializer configurationSerializer;
+ private final RemoteManager remoteManager;
+ private final REEFFileNames fileNames;
+ private final ClasspathProvider classpath;
+ private final double jvmHeapFactor;
+ private final REEFExecutors executors;
+ private static final Logger LOG = Logger.getLogger(MesosResourceLaunchHandler.class.getName());
+
+ @Inject
+ MesosResourceLaunchHandler(final ConfigurationSerializer configurationSerializer,
+ final RemoteManager remoteManager,
+ final REEFFileNames fileNames,
+ final REEFExecutors executors,
+ final ClasspathProvider classpath,
+ final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) {
+ this.configurationSerializer = configurationSerializer;
+ this.remoteManager = remoteManager;
+ this.fileNames = fileNames;
+ this.executors = executors;
+ this.classpath = classpath;
+ this.jvmHeapFactor = 1.0 - jvmHeapSlack;
+ }
+
+
+ @Override
+ public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) {
+ try {
+ LOG.log(Level.INFO, "resourceLaunchProto. {0}", resourceLaunchProto.toString());
+
+ final File localStagingFolder =
+ Files.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix()).toFile();
+
+ final Configuration evaluatorConfiguration = Tang.Factory.getTang()
+ .newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf()))
+ .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+ .build();
+
+ final File configurationFile = new File(
+ localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
+ this.configurationSerializer.toFile(evaluatorConfiguration, configurationFile);
+
+ JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder);
+
+ final FileSystem fileSystem = FileSystem.get(new org.apache.hadoop.conf.Configuration());
+ final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + resourceLaunchProto.getIdentifier() + "/");
+ FileUtil.copy(localStagingFolder, fileSystem, hdfsFolder, false, new org.apache.hadoop.conf.Configuration());
+
+ // TODO: Replace REEFExecutor with a simple launch command (we only need to launch REEFExecutor)
+ final LaunchCommandBuilder commandBuilder;
+ switch (resourceLaunchProto.getType()) {
+ case JVM:
+ commandBuilder = new JavaLaunchCommandBuilder().setClassPath(this.classpath.getEvaluatorClasspath());
+ break;
+ case CLR:
+ commandBuilder = new CLRLaunchCommandBuilder();
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported container type");
+ }
+
+ final List<String> command = commandBuilder
+ .setErrorHandlerRID(this.remoteManager.getMyIdentifier())
+ .setLaunchID(resourceLaunchProto.getIdentifier())
+ .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath())
+ .setMemory((int) (this.jvmHeapFactor * this.executors.getMemory(resourceLaunchProto.getIdentifier())))
+ .build();
+
+ this.executors.launchEvaluator(
+ new EvaluatorLaunch(resourceLaunchProto.getIdentifier(), StringUtils.join(command, ' ')));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
new file mode 100644
index 0000000..41c487e
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+
+import javax.inject.Inject;
+
+@DriverSide
+@Private
+final class MesosResourceReleaseHandler implements ResourceReleaseHandler {
+ private final REEFScheduler REEFScheduler;
+
+ @Inject
+ MesosResourceReleaseHandler(final REEFScheduler REEFScheduler) {
+ this.REEFScheduler = REEFScheduler;
+ }
+
+ @Override
+ public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto resourceReleaseProto) {
+ REEFScheduler.onResourceRelease(resourceReleaseProto);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
new file mode 100644
index 0000000..a9c1016
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+
+import javax.inject.Inject;
+
+@DriverSide
+@Private
+final class MesosResourceRequestHandler implements ResourceRequestHandler {
+ private final REEFScheduler REEFScheduler;
+
+ @Inject
+ MesosResourceRequestHandler(final REEFScheduler REEFScheduler) {
+ this.REEFScheduler = REEFScheduler;
+ }
+
+ @Override
+ public void onNext(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
+ REEFScheduler.onResourceRequest(resourceRequestProto);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
new file mode 100644
index 0000000..c29e780
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+import javax.inject.Inject;
+
+final class MesosRuntimeStartHandler implements EventHandler<RuntimeStart> {
+ private final REEFScheduler REEFScheduler;
+
+ @Inject
+ MesosRuntimeStartHandler(final REEFScheduler REEFScheduler) {
+ this.REEFScheduler = REEFScheduler;
+ }
+
+ @Override
+ public void onNext(final RuntimeStart runtimeStart){
+ this.REEFScheduler.onStart();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
new file mode 100644
index 0000000..3d3b86e
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+
+final class MesosRuntimeStopHandler implements EventHandler<RuntimeStop> {
+ private final REEFScheduler REEFScheduler;
+
+ @Inject
+ MesosRuntimeStopHandler(final REEFScheduler REEFScheduler) {
+ this.REEFScheduler = REEFScheduler;
+ }
+
+ @Override
+ public void onNext(final RuntimeStop runtimeStop) {
+ this.REEFScheduler.onStop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java
new file mode 100644
index 0000000..abde514
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosSchedulerDriverExecutor.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class MesosSchedulerDriverExecutor implements EventHandler<SchedulerDriver> {
+ private static final Logger LOG = Logger.getLogger(MesosSchedulerDriverExecutor.class.getName());
+
+ @Inject
+ public MesosSchedulerDriverExecutor() {
+ }
+
+ @Override
+ public void onNext(final SchedulerDriver schedulerDriver) {
+ LOG.log(Level.INFO, "MesosMaster(SchedulerDriver) starting");
+ final Protos.Status status = schedulerDriver.run();
+ LOG.log(Level.INFO, "MesosMaster(SchedulerDriver) ended with status {0}", status);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
new file mode 100644
index 0000000..fd5cce2
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceStatusProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+@Private
+final class REEFEventHandlers {
+ private final EventHandler<ResourceAllocationProto> resourceAllocationEventHandler;
+ private final EventHandler<RuntimeStatusProto> runtimeStatusEventHandler;
+ private final EventHandler<NodeDescriptorProto> nodeDescriptorEventHandler;
+ private final EventHandler<ResourceStatusProto> resourceStatusHandlerEventHandler;
+
+ @Inject
+ REEFEventHandlers(final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<ResourceAllocationProto> resourceAllocationEventHandler,
+ final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<RuntimeStatusProto> runtimeStatusEventHandler,
+ final @Parameter(RuntimeParameters.NodeDescriptorHandler.class) EventHandler<NodeDescriptorProto> nodeDescriptorEventHandler,
+ final @Parameter(RuntimeParameters.ResourceStatusHandler.class) EventHandler<ResourceStatusProto> resourceStatusHandlerEventHandler) {
+ this.resourceAllocationEventHandler = resourceAllocationEventHandler;
+ this.runtimeStatusEventHandler = runtimeStatusEventHandler;
+ this.nodeDescriptorEventHandler = nodeDescriptorEventHandler;
+ this.resourceStatusHandlerEventHandler = resourceStatusHandlerEventHandler;
+ }
+
+ void onNodeDescriptor(final NodeDescriptorProto nodeDescriptorProto) {
+ this.nodeDescriptorEventHandler.onNext(nodeDescriptorProto);
+ }
+
+ void onRuntimeStatus(final RuntimeStatusProto runtimeStatusProto) {
+ this.runtimeStatusEventHandler.onNext(runtimeStatusProto);
+ }
+
+ void onResourceAllocation(final ResourceAllocationProto resourceAllocationProto) {
+ this.resourceAllocationEventHandler.onNext(resourceAllocationProto);
+ }
+
+ void onResourceStatus(final ResourceStatusProto resourceStatusProto) {
+ this.resourceStatusHandlerEventHandler.onNext(resourceStatusProto);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java
new file mode 100644
index 0000000..be6045a
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * The Driver's view of a REEFExecutor running in the cluster.
+ */
+final class REEFExecutor {
+ private final int memory;
+ private final EventHandler<EvaluatorControl> evaluatorControlHandler;
+
+ REEFExecutor(final int memory,
+ final EventHandler<EvaluatorControl> evaluatorControlHandler) {
+ this.memory = memory;
+ this.evaluatorControlHandler = evaluatorControlHandler;
+ }
+
+ public void launchEvaluator(final EvaluatorLaunch evaluatorLaunch) {
+ this.evaluatorControlHandler.onNext(new EvaluatorControl(evaluatorLaunch, null));
+ }
+
+ public void releaseEvaluator(final EvaluatorRelease evaluatorRelease) {
+ this.evaluatorControlHandler.onNext(new EvaluatorControl(null, evaluatorRelease));
+ }
+
+ public int getMemory() {
+ return this.memory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java
new file mode 100644
index 0000000..f70e3fe
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFExecutors.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The Driver's view of MesosExecutors running in the cluster.
+ */
+final class REEFExecutors {
+ private final Map<String, REEFExecutor> executors = new ConcurrentHashMap<>();
+
+ @Inject
+ REEFExecutors() {
+ }
+
+ public void add(final String id,
+ final int memory,
+ final EventHandler<EvaluatorControl> evaluatorControlHandler) {
+ executors.put(id, new REEFExecutor(memory, evaluatorControlHandler));
+ }
+
+ public void remove(final String id) {
+ this.executors.remove(id);
+ }
+
+ public Set<String> getExecutorIds() { return executors.keySet(); }
+
+ public int getMemory(final String id) {
+ return executors.get(id).getMemory();
+ }
+
+ public void launchEvaluator(final EvaluatorLaunch evaluatorLaunch) {
+ executors.get(evaluatorLaunch.getIdentifier().toString()).launchEvaluator(evaluatorLaunch);
+ }
+
+ public void releaseEvaluator(final EvaluatorRelease evaluatorRelease) {
+ executors.get(evaluatorRelease.getIdentifier().toString()).releaseEvaluator(evaluatorRelease);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
new file mode 100644
index 0000000..9c2c6d9
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java
@@ -0,0 +1,506 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver;
+
+import com.google.protobuf.ByteString;
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceReleaseProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceRequestProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto.Builder;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.proto.ReefServiceProtos.State;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.mesos.driver.parameters.MesosMasterIp;
+import org.apache.reef.runtime.mesos.evaluator.REEFExecutor;
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
+import org.apache.reef.runtime.mesos.util.MesosRemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.Encoder;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.CommandInfo.URI;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Filters;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.Value;
+import org.apache.mesos.Protos.Value.Type;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import javax.inject.Inject;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * MesosScheduler that interacts with MesosMaster and MesosExecutors.
+ */
+final class REEFScheduler implements Scheduler {
+ private static final Logger LOG = Logger.getLogger(REEFScheduler.class.getName());
+ private static final String REEF_TAR = "reef.tar.gz";
+ private static final String RUNTIME_NAME = "MESOS";
+ private static final int MESOS_SLAVE_PORT = 5051; // Assumes for now that all slaves use port 5051(default) TODO: make it configurable.
+ private static final String REEF_JOB_NAME_PREFIX = "reef-job-";
+
+ private final String reefTarUri;
+ private final REEFFileNames fileNames;
+ private final ClasspathProvider classpath;
+
+ private final REEFEventHandlers reefEventHandlers;
+ private final MesosRemoteManager mesosRemoteManager;
+
+ private final SchedulerDriver mesosMaster;
+ private final EStage<SchedulerDriver> schedulerDriverEStage;
+ private final Map<String, Offer> offers = new ConcurrentHashMap<>();
+
+ private int outstandingRequestCounter = 0;
+ private final ConcurrentLinkedQueue<ResourceRequestProto> outstandingRequests = new ConcurrentLinkedQueue<>();
+ private final Map<String, ResourceRequestProto> executorIdToLaunchedRequests = new ConcurrentHashMap<>();
+ private final REEFExecutors executors;
+
+ @Inject
+ REEFScheduler(final REEFEventHandlers reefEventHandlers,
+ final MesosRemoteManager mesosRemoteManager,
+ final REEFExecutors executors,
+ final REEFFileNames fileNames,
+ final EStage<SchedulerDriver> schedulerDriverEStage,
+ final ClasspathProvider classpath,
+ final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier,
+ final @Parameter(MesosMasterIp.class) String masterIp) {
+ this.mesosRemoteManager = mesosRemoteManager;
+ this.reefEventHandlers = reefEventHandlers;
+ this.executors = executors;
+ this.fileNames = fileNames;
+ this.reefTarUri = getReefTarUri(jobIdentifier);
+ this.classpath = classpath;
+ this.schedulerDriverEStage = schedulerDriverEStage;
+
+ final Protos.FrameworkInfo frameworkInfo = Protos.FrameworkInfo.newBuilder()
+ .setUser("") // TODO: make it configurable.
+ .setName(REEF_JOB_NAME_PREFIX + jobIdentifier)
+ .build();
+ this.mesosMaster = new MesosSchedulerDriver(this, frameworkInfo, masterIp);
+ }
+
+ @Override
+ public void registered(final SchedulerDriver driver,
+ final Protos.FrameworkID frameworkId,
+ final Protos.MasterInfo masterInfo) {
+ LOG.log(Level.INFO, "Framework ID={0} registration succeeded", frameworkId);
+ }
+
+ @Override
+ public void reregistered(final SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
+ LOG.log(Level.INFO, "Framework reregistered, MasterInfo: {0}", masterInfo);
+ }
+
+ /**
+ * All offers in each batch of offers will be either be launched or declined
+ */
+ @Override
+ public void resourceOffers(final SchedulerDriver driver, final List<Protos.Offer> offers) {
+ final Map<String, NodeDescriptorProto.Builder> nodeDescriptorProtos = new HashMap<>();
+
+ for (final Offer offer : offers) {
+ if (nodeDescriptorProtos.get(offer.getSlaveId().getValue()) == null) {
+ nodeDescriptorProtos.put(offer.getSlaveId().getValue(), NodeDescriptorProto.newBuilder()
+ .setIdentifier(offer.getSlaveId().getValue())
+ .setHostName(offer.getHostname())
+ .setPort(MESOS_SLAVE_PORT)
+ .setMemorySize(getMemory(offer)));
+ } else {
+ final NodeDescriptorProto.Builder builder = nodeDescriptorProtos.get(offer.getSlaveId().getValue());
+ builder.setMemorySize(builder.getMemorySize() + getMemory(offer));
+ }
+
+ this.offers.put(offer.getId().getValue(), offer);
+ }
+
+ for (final NodeDescriptorProto.Builder ndpBuilder : nodeDescriptorProtos.values()) {
+ this.reefEventHandlers.onNodeDescriptor(ndpBuilder.build());
+ }
+
+ if (outstandingRequests.size() > 0) {
+ doResourceRequest(outstandingRequests.remove());
+ }
+ }
+
+ @Override
+ public void offerRescinded(final SchedulerDriver driver, final Protos.OfferID offerId) {
+ for (final String executorId : this.executorIdToLaunchedRequests.keySet()) {
+ if (executorId.startsWith(offerId.getValue())) {
+ this.outstandingRequests.add(this.executorIdToLaunchedRequests.remove(executorId));
+ }
+ }
+ }
+
+ @Override
+ public void statusUpdate(final SchedulerDriver driver, final Protos.TaskStatus taskStatus) {
+ LOG.log(Level.SEVERE, "Task Status Update:", taskStatus.toString());
+
+ final DriverRuntimeProtocol.ResourceStatusProto.Builder resourceStatus =
+ DriverRuntimeProtocol.ResourceStatusProto.newBuilder().setIdentifier(taskStatus.getTaskId().getValue());
+
+ switch(taskStatus.getState()) {
+ case TASK_STARTING:
+ handleNewExecutor(taskStatus); // As there is only one Mesos Task per Mesos Executor, this is a new executor.
+ return;
+ case TASK_RUNNING:
+ resourceStatus.setState(State.RUNNING);
+ break;
+ case TASK_FINISHED:
+ if (taskStatus.getData().toStringUtf8().equals("eval_not_run")) { // TODO: a hack to pass closeEvaluator test, replace this with a better interface
+ return;
+ }
+ resourceStatus.setState(State.DONE);
+ break;
+ case TASK_KILLED:
+ resourceStatus.setState(State.KILLED);
+ break;
+ case TASK_LOST:
+ case TASK_FAILED:
+ resourceStatus.setState(State.FAILED);
+ break;
+ case TASK_STAGING:
+ throw new RuntimeException("TASK_STAGING should not be used for status update");
+ default:
+ throw new RuntimeException("Unknown TaskStatus");
+ }
+
+ if (taskStatus.getMessage() != null) {
+ resourceStatus.setDiagnostics(taskStatus.getMessage());
+ }
+
+ this.reefEventHandlers.onResourceStatus(resourceStatus.build());
+ }
+
+ @Override
+ public void frameworkMessage(final SchedulerDriver driver,
+ final Protos.ExecutorID executorId,
+ final Protos.SlaveID slaveId,
+ final byte[] data) {
+ LOG.log(Level.INFO, "Framework Message. driver: {0} executorId: {1} slaveId: {2} data: {3}",
+ new Object[]{driver, executorId, slaveId, data});
+ }
+
+ @Override
+ public void disconnected(final SchedulerDriver driver) {
+ this.onRuntimeError(new RuntimeException("Scheduler disconnected from MesosMaster"));
+ }
+
+ @Override
+ public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID slaveId) {
+ LOG.log(Level.SEVERE, "Slave Lost. {0}", slaveId.getValue());
+ }
+
+ @Override
+ public void executorLost(final SchedulerDriver driver,
+ final Protos.ExecutorID executorId,
+ final Protos.SlaveID slaveId,
+ final int status) {
+ final String diagnostics = "Executor Lost. executorid: "+executorId.getValue()+" slaveid: "+slaveId.getValue();
+ final DriverRuntimeProtocol.ResourceStatusProto resourceStatus =
+ DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+ .setIdentifier(executorId.getValue())
+ .setState(State.FAILED)
+ .setExitCode(status)
+ .setDiagnostics(diagnostics)
+ .build();
+
+ this.reefEventHandlers.onResourceStatus(resourceStatus);
+ }
+
+ @Override
+ public void error(final SchedulerDriver driver, final String message) {
+ this.onRuntimeError(new RuntimeException(message));
+ }
+
+ /////////////////////////////////////////////////////////////////
+ // HELPER METHODS
+
+ public void onStart() {
+ this.schedulerDriverEStage.onNext(this.mesosMaster);
+ }
+
+ public void onStop() {
+ this.mesosMaster.stop();
+ try {
+ this.schedulerDriverEStage.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onResourceRequest(final ResourceRequestProto resourceRequestProto) {
+ this.outstandingRequestCounter += resourceRequestProto.getResourceCount();
+ updateRuntimeStatus();
+ doResourceRequest(resourceRequestProto);
+ }
+
+ public void onResourceRelease(final ResourceReleaseProto resourceReleaseProto) {
+ this.executors.releaseEvaluator(new EvaluatorRelease(resourceReleaseProto.getIdentifier()));
+ this.executors.remove(resourceReleaseProto.getIdentifier());
+ updateRuntimeStatus();
+ }
+
+ /**
+ * Greedily acquire resources by launching a Mesos Task(w/ our custom MesosExecutor) on REEF Evaluator request.
+ * Either called from onResourceRequest(for a new request) or resourceOffers(for an outstanding request).
+ * TODO: reflect priority and rack/node locality specified in resourceRequestProto.
+ */
+ private synchronized void doResourceRequest(final ResourceRequestProto resourceRequestProto) {
+ int tasksToLaunchCounter = resourceRequestProto.getResourceCount();
+
+ for (final Offer offer : this.offers.values()) {
+ final int cpuSlots = getCpu(offer) / resourceRequestProto.getVirtualCores();
+ final int memSlots = getMemory(offer) / resourceRequestProto.getMemorySize();
+ final int taskNum = Math.min(Math.min(cpuSlots, memSlots), tasksToLaunchCounter);
+
+ if (taskNum > 0 && satisfySlaveConstraint(resourceRequestProto, offer)) {
+ final List<TaskInfo> tasksToLaunch = new ArrayList<>();
+ tasksToLaunchCounter -= taskNum;
+
+ // Launch as many MesosTasks on the same node(offer) as possible to exploit locality.
+ for (int j = 0; j < taskNum; j++) {
+ final String id = offer.getId().getValue() + "-" + String.valueOf(j);
+ final String executorLaunchCommand = getExecutorLaunchCommand(id, resourceRequestProto.getMemorySize());
+
+ final ExecutorInfo executorInfo = ExecutorInfo.newBuilder()
+ .setExecutorId(ExecutorID.newBuilder()
+ .setValue(id)
+ .build())
+ .setCommand(CommandInfo.newBuilder()
+ .setValue(executorLaunchCommand)
+ .addUris(URI.newBuilder().setValue(reefTarUri).build())
+ .build())
+ .build();
+
+ final TaskInfo taskInfo = TaskInfo.newBuilder()
+ .setTaskId(TaskID.newBuilder()
+ .setValue(id)
+ .build())
+ .setName(id)
+ .setSlaveId(offer.getSlaveId())
+ .addResources(Resource.newBuilder()
+ .setName("mem")
+ .setType(Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder()
+ .setValue(resourceRequestProto.getMemorySize())
+ .build())
+ .build())
+ .addResources(Resource.newBuilder()
+ .setName("cpus")
+ .setType(Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder()
+ .setValue(resourceRequestProto.getVirtualCores())
+ .build())
+ .build())
+ .setExecutor(executorInfo)
+ .build();
+
+ tasksToLaunch.add(taskInfo);
+ this.executorIdToLaunchedRequests.put(id, resourceRequestProto);
+ }
+
+ final Filters filters = Filters.newBuilder().setRefuseSeconds(0).build();
+ mesosMaster.launchTasks(Collections.singleton(offer.getId()), tasksToLaunch, filters);
+ } else {
+ mesosMaster.declineOffer(offer.getId());
+ }
+ }
+
+ // the offers are no longer valid(all launched or declined)
+ this.offers.clear();
+
+ // Save leftovers that couldn't be launched
+ outstandingRequests.add(ResourceRequestProto.newBuilder()
+ .mergeFrom(resourceRequestProto)
+ .setResourceCount(tasksToLaunchCounter)
+ .build());
+ }
+
+ private void handleNewExecutor(final Protos.TaskStatus taskStatus) {
+ final ResourceRequestProto resourceRequestProto =
+ this.executorIdToLaunchedRequests.remove(taskStatus.getTaskId().getValue());
+
+ final EventHandler<EvaluatorControl> evaluatorControlHandler =
+ this.mesosRemoteManager.getHandler(taskStatus.getMessage(), EvaluatorControl.class);
+ this.executors.add(taskStatus.getTaskId().getValue(), resourceRequestProto.getMemorySize(), evaluatorControlHandler);
+
+ final ResourceAllocationProto alloc = DriverRuntimeProtocol.ResourceAllocationProto.newBuilder()
+ .setIdentifier(taskStatus.getTaskId().getValue())
+ .setNodeId(taskStatus.getSlaveId().getValue())
+ .setResourceMemory(resourceRequestProto.getMemorySize())
+ .build();
+ reefEventHandlers.onResourceAllocation(alloc);
+
+ this.outstandingRequestCounter--;
+ this.updateRuntimeStatus();
+ }
+
+ private synchronized void updateRuntimeStatus() {
+ final Builder builder = DriverRuntimeProtocol.RuntimeStatusProto.newBuilder()
+ .setName(RUNTIME_NAME)
+ .setState(State.RUNNING)
+ .setOutstandingContainerRequests(this.outstandingRequestCounter);
+
+ for (final String executorId : this.executors.getExecutorIds()) {
+ builder.addContainerAllocation(executorId);
+ }
+
+ this.reefEventHandlers.onRuntimeStatus(builder.build());
+ }
+
+ private void onRuntimeError(final Throwable throwable) {
+ this.mesosMaster.stop();
+ try {
+ this.schedulerDriverEStage.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ final Builder runtimeStatusBuilder = RuntimeStatusProto.newBuilder()
+ .setState(State.FAILED)
+ .setName(RUNTIME_NAME);
+
+ final Encoder<Throwable> codec = new ObjectSerializableCodec<>();
+ runtimeStatusBuilder.setError(ReefServiceProtos.RuntimeErrorProto.newBuilder()
+ .setName(RUNTIME_NAME)
+ .setMessage(throwable.getMessage())
+ .setException(ByteString.copyFrom(codec.encode(throwable)))
+ .build());
+
+ this.reefEventHandlers.onRuntimeStatus(runtimeStatusBuilder.build());
+ }
+
+ private boolean satisfySlaveConstraint(final ResourceRequestProto resourceRequestProto, final Offer offer) {
+ return resourceRequestProto.getNodeNameCount() == 0 ||
+ resourceRequestProto.getNodeNameList().contains(offer.getSlaveId().getValue());
+ }
+
+ private int getMemory(final Offer offer) {
+ for (final Resource resource : offer.getResourcesList()) {
+ switch (resource.getName()) {
+ case "mem":
+ return (int)resource.getScalar().getValue();
+ }
+ }
+ return 0;
+ }
+
+ private int getCpu(final Offer offer) {
+ for (final Resource resource : offer.getResourcesList()) {
+ switch (resource.getName()) {
+ case "cpus":
+ return (int)resource.getScalar().getValue();
+ }
+ }
+ return 0;
+ }
+
+ private String getExecutorLaunchCommand(final String executorID, final int memorySize) {
+ final String DEFAULT_JAVA_PATH = System.getenv("JAVA_HOME") + "/bin/" + "java";
+ final String classPath = "-classpath " + StringUtils.join(this.classpath.getEvaluatorClasspath(), ":");
+ final String logging = "-Djava.util.logging.config.class=org.apache.reef.util.logging.Config";
+ final String mesosExecutorId = "-mesos_executor_id " + executorID;
+
+ return (new StringBuilder()
+ .append(DEFAULT_JAVA_PATH + " ")
+ .append("-XX:PermSize=128m" + " ")
+ .append("-XX:MaxPermSize=128m" + " ")
+ .append("-Xmx" + String.valueOf(memorySize) + "m" + " ")
+ .append(classPath + " ")
+ .append(logging + " ")
+ .append(REEFExecutor.class.getName() + " ")
+ .append(mesosExecutorId + " ")
+ .toString());
+ }
+
+ private String getReefTarUri(final String jobIdentifier) {
+ try {
+ // Create REEF_TAR
+ final FileOutputStream fileOutputStream = new FileOutputStream(REEF_TAR);
+ final TarArchiveOutputStream tarArchiveOutputStream =
+ new TarArchiveOutputStream(new GZIPOutputStream(fileOutputStream));
+ final File globalFolder = new File(this.fileNames.getGlobalFolderPath());
+ final DirectoryStream<Path> directoryStream = Files.newDirectoryStream(globalFolder.toPath());
+
+ for (final Path path : directoryStream) {
+ tarArchiveOutputStream.putArchiveEntry(new TarArchiveEntry(path.toFile(),
+ globalFolder + "/" + path.getFileName()));
+
+ final BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(path.toFile()));
+ IOUtils.copy(bufferedInputStream, tarArchiveOutputStream);
+ bufferedInputStream.close();
+
+ tarArchiveOutputStream.closeArchiveEntry();
+ }
+ directoryStream.close();
+ tarArchiveOutputStream.close();
+ fileOutputStream.close();
+
+ // Upload REEF_TAR to HDFS
+ final FileSystem fileSystem = FileSystem.get(new Configuration());
+ final org.apache.hadoop.fs.Path src = new org.apache.hadoop.fs.Path(REEF_TAR);
+ final String reefTarUri = fileSystem.getUri().toString() + "/" + jobIdentifier + "/" + REEF_TAR;
+ final org.apache.hadoop.fs.Path dst = new org.apache.hadoop.fs.Path(reefTarUri);
+ fileSystem.copyFromLocalFile(src, dst);
+
+ return reefTarUri;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/parameters/MesosMasterIp.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/parameters/MesosMasterIp.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/parameters/MesosMasterIp.java
new file mode 100644
index 0000000..863f5b3
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/parameters/MesosMasterIp.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The ip address of Mesos Master")
+public final class MesosMasterIp implements Name<String> {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/EvaluatorControlHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/EvaluatorControlHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/EvaluatorControlHandler.java
new file mode 100644
index 0000000..20287e1
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/EvaluatorControlHandler.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.evaluator;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+
+/**
+ * Handles evaluator launch requests via MesosRemoteManager from MesosResourceLaunchHandler
+ */
+@EvaluatorSide
+@Private
+final class EvaluatorControlHandler implements EventHandler<RemoteMessage<EvaluatorControl>> {
+ // EvaluatorLaunchHandler is registered in MesosExecutor. Hence, we need an InjectionFuture here.
+ private final InjectionFuture<REEFExecutor> mesosExecutor;
+
+ @Inject
+ EvaluatorControlHandler(final InjectionFuture<REEFExecutor> mesosExecutor) {
+ this.mesosExecutor = mesosExecutor;
+ }
+
+ @Override
+ public void onNext(final RemoteMessage<EvaluatorControl> remoteMessage) {
+ final EvaluatorControl evaluatorControl = remoteMessage.getMessage();
+ if (evaluatorControl.getEvaluatorLaunch() != null) {
+ this.mesosExecutor.get().onEvaluatorLaunch(evaluatorControl.getEvaluatorLaunch());
+ } else if (evaluatorControl.getEvaluatorRelease() != null) {
+ this.mesosExecutor.get().onEvaluatorRelease(evaluatorControl.getEvaluatorRelease());
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/REEFExecutor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/REEFExecutor.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/REEFExecutor.java
new file mode 100644
index 0000000..7225f92
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/REEFExecutor.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.evaluator;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.mesos.evaluator.parameters.MesosExecutorId;
+import org.apache.reef.runtime.mesos.util.EvaluatorControl;
+import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
+import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
+import org.apache.reef.runtime.mesos.util.MesosRemoteManager;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.mesos.Executor;
+import org.apache.mesos.ExecutorDriver;
+import org.apache.mesos.MesosExecutorDriver;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.SlaveInfo;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class REEFExecutor implements Executor {
+ private final static Logger LOG = Logger.getLogger(REEFExecutor.class.getName());
+
+ private final MesosExecutorDriver mesosExecutorDriver;
+ private final MesosRemoteManager mesosRemoteManager;
+ private final ExecutorService executorService;
+ private final REEFFileNames fileNames;
+ private final String mesosExecutorId;
+
+ private Process evaluatorProcess;
+ private Integer evaluatorProcessExitValue;
+
+ @Inject
+ REEFExecutor(final EvaluatorControlHandler evaluatorControlHandler,
+ final MesosRemoteManager mesosRemoteManager,
+ final REEFFileNames fileNames,
+ final @Parameter(MesosExecutorId.class) String mesosExecutorId) {
+ this.mesosRemoteManager = mesosRemoteManager;
+ this.mesosRemoteManager.registerHandler(EvaluatorControl.class, evaluatorControlHandler);
+ this.mesosExecutorDriver = new MesosExecutorDriver(this);
+ this.executorService = Executors.newCachedThreadPool();
+ this.fileNames = fileNames;
+ this.mesosExecutorId = mesosExecutorId;
+ }
+
+ @Override
+ public final void registered(final ExecutorDriver driver,
+ final ExecutorInfo executorInfo,
+ final FrameworkInfo frameworkInfo,
+ final SlaveInfo slaveInfo) {
+ LOG.log(Level.FINEST, "Executor registered. driver: {0} executorInfo: {1} frameworkInfo: {2} slaveInfo {3}",
+ new Object[]{driver, executorInfo, frameworkInfo, slaveInfo});
+ }
+
+ @Override
+ public final void reregistered(final ExecutorDriver driver, final SlaveInfo slaveInfo) {
+ LOG.log(Level.FINEST, "Executor reregistered. driver: {0}", driver);
+ }
+
+ @Override
+ public final void disconnected(final ExecutorDriver driver) {
+ this.onRuntimeError();
+ }
+
+ /**
+ * We assume a long-running Mesos Task that manages a REEF Evaluator process, leveraging Mesos Executor's interface.
+ */
+ @Override
+ public final void launchTask(final ExecutorDriver driver, final TaskInfo task) {
+ driver.sendStatusUpdate(TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(this.mesosExecutorId).build())
+ .setState(TaskState.TASK_STARTING)
+ .setSlaveId(task.getSlaveId())
+ .setMessage(this.mesosRemoteManager.getMyIdentifier())
+ .build());
+ }
+
+ @Override
+ public final void killTask(final ExecutorDriver driver, final TaskID taskId) {
+ this.onStop();
+ }
+
+ @Override
+ public final void frameworkMessage(final ExecutorDriver driver, final byte[] data) {
+ LOG.log(Level.FINEST, "Framework Messge. ExecutorDriver: {0}, data: {1}.",
+ new Object[]{driver, data});
+ }
+
+ @Override
+ public final void shutdown(final ExecutorDriver driver) {
+ this.onStop();
+ }
+
+ @Override
+ public final void error(final ExecutorDriver driver, final String message) {
+ this.onRuntimeError();
+ }
+
+ /////////////////////////////////////////////////////////////////
+ // HELPER METHODS
+
+ private void onStart() {
+ this.executorService.submit(new Thread() { public void run() {
+ final Status status;
+ status = mesosExecutorDriver.run();
+ LOG.log(Level.INFO, "MesosExecutorDriver ended with status {0}", status);
+ }});
+ }
+
+ private void onStop() {
+ // Shutdown REEF Evaluator
+ if (this.evaluatorProcess != null) {
+ this.evaluatorProcess.destroy();
+ mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder()
+ .setValue(mesosExecutorId)
+ .build())
+ .setState(TaskState.TASK_FINISHED)
+ .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue))
+ .build());
+ } else {
+ mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder()
+ .setValue(mesosExecutorId)
+ .build())
+ .setState(TaskState.TASK_FINISHED)
+ .setData(ByteString.copyFromUtf8("eval_not_run")) // TODO: a hack to pass closeEvaluator test, replace this with a better interface
+ .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue))
+ .build());
+ }
+
+ // Shutdown Mesos Executor
+ this.executorService.shutdown();
+ this.mesosExecutorDriver.stop();
+ }
+
+ private void onRuntimeError() {
+ // Shutdown REEF Evaluator
+ if (this.evaluatorProcess != null) {
+ this.evaluatorProcess.destroy();
+ }
+ mesosExecutorDriver.sendStatusUpdate(TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder()
+ .setValue(mesosExecutorId)
+ .build())
+ .setState(TaskState.TASK_FAILED)
+ .setMessage("Evaluator Process exited with status " + String.valueOf(evaluatorProcessExitValue))
+ .build());
+
+ // Shutdown Mesos Executor
+ this.executorService.shutdown();
+ this.mesosExecutorDriver.stop();
+ }
+
+ public final void onEvaluatorRelease(final EvaluatorRelease evaluatorRelease) {
+ LOG.log(Level.INFO, "Release!!!! {0}", evaluatorRelease.toString());
+ assert(evaluatorRelease.getIdentifier().toString().equals(this.mesosExecutorId));
+ this.onStop();
+ }
+
+ public final void onEvaluatorLaunch(final EvaluatorLaunch evaluatorLaunch) {
+ LOG.log(Level.INFO, "Launch!!!! {0}", evaluatorLaunch.toString());
+ assert(evaluatorLaunch.getIdentifier().toString().equals(this.mesosExecutorId));
+ final ExecutorService evaluatorLaunchExecutorService = Executors.newSingleThreadExecutor();
+ evaluatorLaunchExecutorService.submit(new Thread() {
+ public void run() {
+ try {
+ final List<String> command = Arrays.asList(evaluatorLaunch.getCommand().toString().split(" "));
+ LOG.log(Level.INFO, "Command!!!! {0}", command);
+ final FileSystem fileSystem = FileSystem.get(new Configuration());
+ final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + mesosExecutorId);
+ final File localFolder = new File(fileNames.getREEFFolderName(), fileNames.getLocalFolderName());
+
+ FileUtil.copy(fileSystem, hdfsFolder, localFolder, true, new Configuration());
+
+ evaluatorProcess = new ProcessBuilder()
+ .command(command)
+ .redirectError(new File(fileNames.getEvaluatorStderrFileName()))
+ .redirectOutput(new File(fileNames.getEvaluatorStdoutFileName()))
+ .start();
+
+ evaluatorProcessExitValue = evaluatorProcess.waitFor();
+
+ fileSystem.close();
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ evaluatorLaunchExecutorService.shutdown();
+ }
+
+ public static org.apache.reef.tang.Configuration parseCommandLine(final String[] args) throws IOException {
+ final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+
+ new CommandLine(confBuilder)
+ .registerShortNameOfClass(MesosExecutorId.class)
+ .processCommandLine(args);
+
+ return confBuilder.build();
+ }
+
+ /**
+ * The starting point of the executor.
+ */
+ public static void main(final String[] args) throws Exception {
+ final Injector injector = Tang.Factory.getTang().newInjector(parseCommandLine(args));
+ final REEFExecutor REEFExecutor = injector.getInstance(REEFExecutor.class);
+ REEFExecutor.onStart();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/parameters/MesosExecutorId.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/parameters/MesosExecutorId.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/parameters/MesosExecutorId.java
new file mode 100644
index 0000000..0f2044c
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/evaluator/parameters/MesosExecutorId.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The Executor's id", short_name = "mesos_executor_id")
+public final class MesosExecutorId implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/HDFSConfigurationConstructor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/HDFSConfigurationConstructor.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/HDFSConfigurationConstructor.java
new file mode 100644
index 0000000..f028274
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/HDFSConfigurationConstructor.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.reef.tang.ExternalConstructor;
+
+import javax.inject.Inject;
+
+public final class HDFSConfigurationConstructor implements ExternalConstructor<Configuration> {
+ @Inject
+ HDFSConfigurationConstructor() {
+ }
+
+ @Override
+ public Configuration newInstance() {
+ return new Configuration();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosErrorHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosErrorHandler.java
new file mode 100644
index 0000000..19ec56a
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosErrorHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.util;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * ErrorHandler for MesosRemoteManager.
+ * TODO: Replace this class once Tang's namespace feature is enabled
+ */
+public final class MesosErrorHandler implements EventHandler<Throwable> {
+
+ private static final Logger LOG = Logger.getLogger(MesosErrorHandler.class.getName());
+
+ @Inject
+ MesosErrorHandler() {
+ }
+
+ @Override
+ public void onNext(final Throwable e) {
+ LOG.log(Level.SEVERE, "MesosRemoteManager Error", e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
new file mode 100644
index 0000000..b32475d
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.util;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteIdentifierFactory;
+import org.apache.reef.wake.remote.RemoteManager;
+import org.apache.reef.wake.remote.RemoteMessage;
+import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.remote.impl.StringCodec;
+
+import javax.inject.Inject;
+
+/**
+ * Since the existing RemoteManager cannot use an additional codec,
+ * we need this additional RemoteManager to use MesosMessageCodec.
+ * TODO: Replace this class once Tang's namespace feature is enabled
+ */
+public final class MesosRemoteManager {
+ private final RemoteManager raw;
+ private final RemoteIdentifierFactory factory;
+
+ @Inject
+ MesosRemoteManager(final RemoteIdentifierFactory factory,
+ final MesosErrorHandler mesosErrorHandler,
+ final MesosRemoteManagerCodec codec) {
+ this.factory = factory;
+ this.raw = new DefaultRemoteManagerImplementation("MESOS_EXECUTOR", "##UNKNOWN##", 0,
+ codec, mesosErrorHandler, false, 3, 10000);
+ }
+
+ public <T> EventHandler<T> getHandler(
+ final String destinationIdentifier, final Class<? extends T> messageType) {
+ return this.raw.getHandler(factory.getNewInstance(destinationIdentifier), messageType);
+ }
+
+ public <T, U extends T> AutoCloseable registerHandler(
+ final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) {
+ return this.raw.registerHandler(messageType, theHandler);
+ }
+
+ public String getMyIdentifier() {
+ return this.raw.getMyIdentifier().toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManagerCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManagerCodec.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManagerCodec.java
new file mode 100644
index 0000000..aa4c853
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManagerCodec.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.util;
+
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
+
+import javax.inject.Inject;
+import java.io.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class MesosRemoteManagerCodec implements Codec<EvaluatorControl> {
+ private static final Logger LOG = Logger.getLogger(MesosRemoteManagerCodec.class.getName());
+
+ @Inject
+ public MesosRemoteManagerCodec() {
+ }
+
+ @Override
+ public byte[] encode(final EvaluatorControl evaluatorControl) {
+ try {
+ LOG.log(Level.INFO, "Before Eecoding: {0}", evaluatorControl.toString());
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ final DatumWriter<EvaluatorControl> writer = new SpecificDatumWriter<>(EvaluatorControl.getClassSchema());
+ writer.write(evaluatorControl, encoder);
+ encoder.flush();
+ out.close();
+ LOG.log(Level.INFO, "After Encoding");
+ return out.toByteArray();
+ } catch (final IOException ex) {
+ throw new RemoteRuntimeException(ex);
+ }
+ }
+
+ @Override
+ public EvaluatorControl decode(final byte[] buf) {
+ try {
+ LOG.log(Level.INFO, "Before Decoding: {0}", buf);
+ final SpecificDatumReader<EvaluatorControl> reader = new SpecificDatumReader<>(EvaluatorControl.getClassSchema());
+ final Decoder decoder = DecoderFactory.get().binaryDecoder(buf, null);
+ LOG.log(Level.INFO, "After Decoding");
+ return reader.read(null, decoder);
+ } catch (final IOException ex) {
+ throw new RemoteRuntimeException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/pom.xml b/lang/java/reef-runtime-yarn/pom.xml
new file mode 100644
index 0000000..faa1c22
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.11.0-incubating-SNAPSHOT</version>
+ </parent>
+ <name>REEF Runtime for YARN</name>
+ <artifactId>reef-runtime-yarn</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-utils-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <targetPath>META-INF/</targetPath>
+ <filtering>false</filtering>
+ <directory>${basedir}/conf</directory>
+ <includes>
+ <include>*.xml</include>
+ <include>*.properties</include>
+ </includes>
+ <excludes>
+ </excludes>
+ </resource>
+ </resources>
+ </build>
+
+</project>