You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:54 UTC
[21/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
new file mode 100644
index 0000000..cdd8b90
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.local.process.ReefRunnableProcessObserver;
+import org.apache.reef.runtime.local.process.RunnableProcess;
+import org.apache.reef.runtime.local.process.RunnableProcessObserver;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A Container that runs an Evaluator in a Process
+ */
+@Private
+@TaskSide
+final class ProcessContainer implements Container {
+
+ private static final Logger LOG = Logger.getLogger(ProcessContainer.class.getName());
+
+ private final String errorHandlerRID;
+ private final String nodeID;
+ private final File folder;
+ private final String containedID;
+ private final int megaBytes;
+ private final int numberOfCores;
+ private final REEFFileNames fileNames;
+ private final File reefFolder;
+ private final File localFolder;
+ private final File globalFolder;
+ private final RunnableProcessObserver processObserver;
+ private Thread theThread;
+ private RunnableProcess process;
+
+ /**
+ * @param errorHandlerRID the remoteID of the error handler.
+ * @param nodeID the ID of the (fake) node this Container is instantiated on
+ * @param containedID the ID used to identify this container uniquely
+ * @param folder the folder in which logs etc. will be deposited
+ */
+ ProcessContainer(final String errorHandlerRID,
+ final String nodeID,
+ final String containedID,
+ final File folder,
+ final int megaBytes,
+ final int numberOfCores,
+ final REEFFileNames fileNames,
+ final ReefRunnableProcessObserver processObserver) {
+ this.errorHandlerRID = errorHandlerRID;
+ this.nodeID = nodeID;
+ this.containedID = containedID;
+ this.folder = folder;
+ this.megaBytes = megaBytes;
+ this.numberOfCores = numberOfCores;
+ this.fileNames = fileNames;
+ this.processObserver = processObserver;
+ this.reefFolder = new File(folder, fileNames.getREEFFolderName());
+ this.localFolder = new File(reefFolder, fileNames.getLocalFolderName());
+ this.localFolder.mkdirs();
+ this.globalFolder = new File(reefFolder, fileNames.getGlobalFolderName());
+ this.globalFolder.mkdirs();
+ }
+
+ private static void copy(final Iterable<File> files, final File folder) throws IOException {
+ for (final File sourceFile : files) {
+ final File destinationFile = new File(folder, sourceFile.getName());
+ if (Files.isSymbolicLink(sourceFile.toPath())) {
+ final Path linkTargetPath = Files.readSymbolicLink(sourceFile.toPath());
+ Files.createSymbolicLink(destinationFile.toPath(), linkTargetPath);
+ } else {
+ Files.copy(sourceFile.toPath(), destinationFile.toPath());
+ }
+ }
+ }
+
+ @Override
+ public void addLocalFiles(final Iterable<File> files) {
+ try {
+ copy(files, this.localFolder);
+ } catch (final IOException e) {
+ throw new RuntimeException("Unable to copy files to the evaluator folder.", e);
+ }
+ }
+
+ @Override
+ public void addGlobalFiles(final File globalFolder) {
+ try {
+ copy(Arrays.asList(globalFolder.listFiles()), this.globalFolder);
+ } catch (final IOException e) {
+ throw new RuntimeException("Unable to copy files to the evaluator folder.", e);
+ }
+ }
+
+ @Override
+ public void run(final List<String> commandLine) {
+ this.process = new RunnableProcess(commandLine,
+ this.containedID,
+ this.folder,
+ this.processObserver,
+ this.fileNames.getEvaluatorStdoutFileName(),
+ this.fileNames.getEvaluatorStderrFileName());
+ this.theThread = new Thread(this.process);
+ this.theThread.start();
+ }
+
+ @Override
+ public final boolean isRunning() {
+ return null != this.theThread && this.theThread.isAlive();
+ }
+
+ @Override
+ public final int getMemory() {
+ return this.megaBytes;
+ }
+
+ @Override
+ public final int getNumberOfCores() {
+ return this.numberOfCores;
+ }
+
+ @Override
+ public File getFolder() {
+ return this.folder;
+ }
+
+ @Override
+ public String getNodeID() {
+ return this.nodeID;
+ }
+
+ @Override
+ public String getContainerID() {
+ return this.containedID;
+ }
+
+ @Override
+ public void close() {
+ if (isRunning()) {
+ LOG.log(Level.WARNING, "Force-closing a container that is still running: {0}", this);
+ this.process.cancel();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ProcessContainer{" +
+ "containedID='" + containedID + '\'' +
+ ", nodeID='" + nodeID + '\'' +
+ ", errorHandlerRID='" + errorHandlerRID + '\'' +
+ ", folder=" + folder +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
new file mode 100644
index 0000000..9708e77
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.LaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.runtime.local.client.parameters.DefaultMemorySize;
+import org.apache.reef.runtime.local.client.parameters.DefaultNumberOfCores;
+import org.apache.reef.runtime.local.driver.parameters.GlobalFiles;
+import org.apache.reef.runtime.local.driver.parameters.GlobalLibraries;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A resource manager that uses threads to execute containers.
+ */
+@Private
+@DriverSide
+public final class ResourceManager {
+
+ private final static Logger LOG = Logger.getLogger(ResourceManager.class.getName());
+
+ private final ResourceRequestQueue requestQueue = new ResourceRequestQueue();
+
+ private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler;
+ private final ContainerManager theContainers;
+ private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler;
+ private final int defaultMemorySize;
+ private final int defaultNumberOfCores;
+ private final ConfigurationSerializer configurationSerializer;
+ private final RemoteManager remoteManager;
+ private final REEFFileNames fileNames;
+ private final ClasspathProvider classpathProvider;
+ private final double jvmHeapFactor;
+ private final LoggingScopeFactory loggingScopeFactory;
+
+ @Inject
+ ResourceManager(
+ final ContainerManager containerManager,
+ final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler,
+ final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler,
+ final @Parameter(GlobalLibraries.class) Set<String> globalLibraries,
+ final @Parameter(GlobalFiles.class) Set<String> globalFiles,
+ final @Parameter(DefaultMemorySize.class) int defaultMemorySize,
+ final @Parameter(DefaultNumberOfCores.class) int defaultNumberOfCores,
+ final @Parameter(JVMHeapSlack.class) double jvmHeapSlack,
+ final ConfigurationSerializer configurationSerializer,
+ final RemoteManager remoteManager,
+ final REEFFileNames fileNames,
+ final ClasspathProvider classpathProvider,
+ final LoggingScopeFactory loggingScopeFactory) {
+
+ this.theContainers = containerManager;
+ this.allocationHandler = allocationHandler;
+ this.runtimeStatusHandlerEventHandler = runtimeStatusHandlerEventHandler;
+ this.configurationSerializer = configurationSerializer;
+ this.remoteManager = remoteManager;
+ this.defaultMemorySize = defaultMemorySize;
+ this.defaultNumberOfCores = defaultNumberOfCores;
+ this.fileNames = fileNames;
+ this.classpathProvider = classpathProvider;
+ this.jvmHeapFactor = 1.0 - jvmHeapSlack;
+ this.loggingScopeFactory = loggingScopeFactory;
+
+ LOG.log(Level.FINE, "Instantiated 'ResourceManager'");
+ }
+
+ /**
+ * Extracts the files out of the launchRequest.
+ *
+ * @param launchRequest the ResourceLaunchProto to parse
+ * @return a list of files set in the given ResourceLaunchProto
+ */
+ private static List<File> getLocalFiles(final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) {
+ final List<File> files = new ArrayList<>(); // Libraries local to this evaluator
+ for (final ReefServiceProtos.FileResourceProto frp : launchRequest.getFileList()) {
+ files.add(new File(frp.getPath()).getAbsoluteFile());
+ }
+ return files;
+ }
+
+ /**
+ * Receives a resource request.
+ * <p/>
+ * If the request can be met, it will also be satisfied immediately.
+ *
+ * @param resourceRequest the resource request to be handled.
+ */
+ final void onResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto resourceRequest) {
+ synchronized (this.theContainers) {
+ this.requestQueue.add(new ResourceRequest(resourceRequest));
+ this.checkRequestQueue();
+ }
+ }
+
+ /**
+ * Receives and processes a resource release request.
+ *
+ * @param releaseRequest the release request to be processed
+ */
+ final void onResourceReleaseRequest(final DriverRuntimeProtocol.ResourceReleaseProto releaseRequest) {
+ synchronized (this.theContainers) {
+ LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier());
+ this.theContainers.release(releaseRequest.getIdentifier());
+ this.checkRequestQueue();
+ }
+ }
+
+ /**
+ * Called when the ReefRunnableProcessObserver detects that the Evaluator process has exited.
+ *
+ * @param evaluatorId the ID of the Evaluator that exited.
+ */
+ public final void onEvaluatorExit(final String evaluatorId) {
+ synchronized (this.theContainers) {
+ this.theContainers.release(evaluatorId);
+ this.checkRequestQueue();
+ }
+ }
+
+ /**
+ * Processes a resource launch request.
+ *
+ * @param launchRequest the launch request to be processed.
+ */
+ final void onResourceLaunchRequest(
+ final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) {
+
+ synchronized (this.theContainers) {
+
+ final Container c = this.theContainers.get(launchRequest.getIdentifier());
+
+ try (final LoggingScope lb = this.loggingScopeFactory.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:evaluatorConfigurationFile")) {
+ // Add the global files and libraries.
+ c.addGlobalFiles(this.fileNames.getGlobalFolder());
+ c.addLocalFiles(getLocalFiles(launchRequest));
+
+ // Make the configuration file of the evaluator.
+ final File evaluatorConfigurationFile = new File(c.getFolder(), fileNames.getEvaluatorConfigurationPath());
+
+ try {
+ this.configurationSerializer.toFile(this.configurationSerializer.fromString(launchRequest.getEvaluatorConf()),
+ evaluatorConfigurationFile);
+ } catch (final IOException | BindException e) {
+ throw new RuntimeException("Unable to write configuration.", e);
+ }
+ }
+
+ try (final LoggingScope lc = this.loggingScopeFactory.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:runCommand")) {
+ // Assemble the command line
+ final LaunchCommandBuilder commandBuilder;
+ switch (launchRequest.getType()) {
+ case JVM:
+ commandBuilder = new JavaLaunchCommandBuilder()
+ .setClassPath(this.classpathProvider.getEvaluatorClasspath());
+ break;
+ case CLR:
+ commandBuilder = new CLRLaunchCommandBuilder();
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported container type: " + launchRequest.getType());
+ }
+
+ final List<String> command = commandBuilder
+ .setErrorHandlerRID(this.remoteManager.getMyIdentifier())
+ .setLaunchID(c.getNodeID())
+ .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath())
+ .setMemory((int) (this.jvmHeapFactor * c.getMemory()))
+ .build();
+
+ LOG.log(Level.FINEST, "Launching container: {0}", c);
+ c.run(command);
+ }
+ }
+ }
+
+ /**
+ * Checks the allocation queue for new allocations and if there are any
+ * satisfies them.
+ */
+ private void checkRequestQueue() {
+
+ if (this.theContainers.hasContainerAvailable() && this.requestQueue.hasOutStandingRequests()) {
+
+ // Record the satisfaction of one request and get its details.
+ final DriverRuntimeProtocol.ResourceRequestProto requestProto = this.requestQueue.satisfyOne();
+
+ // Allocate a Container
+ final Container container = this.theContainers.allocateOne(
+ requestProto.hasMemorySize() ? requestProto.getMemorySize() : this.defaultMemorySize,
+ requestProto.hasVirtualCores() ? requestProto.getVirtualCores() : this.defaultNumberOfCores);
+
+ // Tell the receivers about it
+ final DriverRuntimeProtocol.ResourceAllocationProto alloc =
+ DriverRuntimeProtocol.ResourceAllocationProto.newBuilder()
+ .setIdentifier(container.getContainerID())
+ .setNodeId(container.getNodeID())
+ .setResourceMemory(container.getMemory())
+ .setVirtualCores(container.getNumberOfCores())
+ .build();
+
+ LOG.log(Level.FINEST, "Allocating container: {0}", container);
+ this.allocationHandler.onNext(alloc);
+
+ // update REEF
+ this.sendRuntimeStatus();
+
+ // Check whether we can satisfy another one.
+ this.checkRequestQueue();
+
+ } else {
+ this.sendRuntimeStatus();
+ }
+ }
+
+ private void sendRuntimeStatus() {
+
+ final DriverRuntimeProtocol.RuntimeStatusProto msg =
+ DriverRuntimeProtocol.RuntimeStatusProto.newBuilder()
+ .setName("LOCAL")
+ .setState(ReefServiceProtos.State.RUNNING)
+ .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests())
+ .addAllContainerAllocation(this.theContainers.getAllocatedContainerIDs())
+ .build();
+
+ LOG.log(Level.INFO, "Allocated: {0}, Outstanding requests: {1}",
+ new Object[]{msg.getContainerAllocationCount(), msg.getOutstandingContainerRequests()});
+ this.runtimeStatusHandlerEventHandler.onNext(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
new file mode 100644
index 0000000..bbb20a3
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+
+/**
+ * Manages a ResourceRequestProto and its satisfaction.
+ */
+@Private
+@DriverSide
+final class ResourceRequest {
+
+ private final DriverRuntimeProtocol.ResourceRequestProto req;
+ private int satisfied = 0;
+
+ ResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto req) {
+ if (null == req) {
+ throw new IllegalArgumentException("Can't instantiate a ResourceRequest without a ResourceRequestProto");
+ }
+ this.req = req;
+ }
+
+ /**
+ * Records that one resource has been allocated to this Request.
+ */
+ final void satisfyOne() {
+ this.satisfied += 1;
+ if (this.satisfied > req.getResourceCount()) {
+ throw new IllegalStateException("This request has been oversatisfied.");
+ }
+ }
+
+ /**
+ * @return true if the request is satisfied with this additional unit of
+ * resource, false otherwise.
+ */
+ final boolean isSatisfied() {
+ return this.satisfied == req.getResourceCount();
+ }
+
+ final DriverRuntimeProtocol.ResourceRequestProto getRequestProto() {
+ return this.req;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
new file mode 100644
index 0000000..c2a87ff
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Manages a queue of resource requests.
+ */
+@Private
+@DriverSide
+final class ResourceRequestQueue {
+
+ private final BlockingQueue<ResourceRequest> requestQueue = new LinkedBlockingQueue<>();
+
+ /**
+ * Add a request to the end of the queue.
+ *
+ * @param req the request to be added.
+ */
+ final void add(final ResourceRequest req) {
+ this.requestQueue.add(req);
+ }
+
+ /**
+ * @return true if there are outstanding resource requests. false otherwise.
+ */
+ final boolean hasOutStandingRequests() {
+ return !this.requestQueue.isEmpty();
+ }
+
+ /**
+ * Satisfies one resource for the front-most request. If that satisfies the
+ * request, it is removed from the queue.
+ */
+ final synchronized DriverRuntimeProtocol.ResourceRequestProto satisfyOne() {
+ final ResourceRequest req = this.requestQueue.element();
+ req.satisfyOne();
+ if (req.isSatisfied()) {
+ this.requestQueue.poll();
+ }
+ return req.getRequestProto();
+ }
+
+ final synchronized int getNumberOfOutstandingRequests() {
+ return this.requestQueue.size();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/package-info.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/package-info.java
new file mode 100644
index 0000000..d79462d
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The resource manager for the local resourcemanager
+ */
+package org.apache.reef.runtime.local.driver;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalFiles.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalFiles.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalFiles.java
new file mode 100644
index 0000000..489054f
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalFiles.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+/**
+ * The names of files that are to be copied to all evaluators.
+ */
+@NamedParameter(doc = "The names of files that are to be copied to all evaluators.")
+public final class GlobalFiles implements Name<Set<String>> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalLibraries.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalLibraries.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalLibraries.java
new file mode 100644
index 0000000..1912829
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/GlobalLibraries.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+/**
+ * The names of files that are to be copied to all evaluators.
+ */
+@NamedParameter(doc = "The names of files that are to be copied to all evaluators.")
+public final class GlobalLibraries implements Name<Set<String>> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalFiles.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalFiles.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalFiles.java
new file mode 100644
index 0000000..87226cf
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalFiles.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+/**
+ * Created by marku_000 on 2014-07-07.
+ */
+@NamedParameter(doc = "The names of files that are to be kept on the driver only.")
+public final class LocalFiles implements Name<Set<String>> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalLibraries.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalLibraries.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalLibraries.java
new file mode 100644
index 0000000..0aefb2b
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/LocalLibraries.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+/**
+ * The names of files that are to be kept on the driver only.
+ */
+@NamedParameter(doc = "The names of files that are to be kept on the driver only.")
+public final class LocalLibraries implements Name<Set<String>> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/package-info.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/package-info.java
new file mode 100644
index 0000000..fccf5c7
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/parameters/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The named parameters for the Driver executed on the local runtime
+ */
+package org.apache.reef.runtime.local.driver.parameters;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/LoggingRunnableProcessObserver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/LoggingRunnableProcessObserver.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/LoggingRunnableProcessObserver.java
new file mode 100644
index 0000000..dd0d75e
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/LoggingRunnableProcessObserver.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.process;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A RunnableProcessObserver that logs the events/
+ */
+public final class LoggingRunnableProcessObserver implements RunnableProcessObserver {
+ private static final Logger LOG = Logger.getLogger(LoggingRunnableProcessObserver.class.getName());
+
+ @Override
+ public void onProcessStarted(final String processId) {
+ LOG.log(Level.FINE, "Process {0} started.", processId);
+
+ }
+
+ @Override
+ public void onProcessExit(final String processId, final int exitCode) {
+ if (exitCode == 0) {
+ LOG.log(Level.FINE, "Process {0} exited with return code 0.", processId);
+ } else {
+ LOG.log(Level.WARNING, "Process {0} exited with return code {1}.", new Object[]{processId, exitCode});
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
new file mode 100644
index 0000000..36ee916
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.process;
+
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.local.driver.ResourceManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * a RunnableProcessObserver that relies events to REEF's ResourceStatusHandler
+ */
+@ThreadSafe
+public final class ReefRunnableProcessObserver implements RunnableProcessObserver {
+ private static final Logger LOG = Logger.getLogger(ReefRunnableProcessObserver.class.getName());
+
+ private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler;
+ private final InjectionFuture<ResourceManager> resourceManager;
+
+ /**
+ * @param resourceStatusHandler the event handler to inform of resource changes.
+ */
+ @Inject
+ public ReefRunnableProcessObserver(final @Parameter(RuntimeParameters.ResourceStatusHandler.class)
+ EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler,
+ final InjectionFuture<ResourceManager> resourceManager) {
+ this.resourceStatusHandler = resourceStatusHandler;
+ this.resourceManager = resourceManager;
+ }
+
+ @Override
+ public void onProcessStarted(final String processId) {
+ this.onResourceStatus(
+ DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+ .setIdentifier(processId)
+ .setState(ReefServiceProtos.State.RUNNING)
+ .build()
+ );
+ }
+
+ @Override
+ public void onProcessExit(final String processId, final int exitCode) {
+ // Note that the order here matters: We need to first inform the Driver's event handlers about the process exit
+ // and then release the resources. Otherwise, the Driver might be shutdown because of an idle condition before the
+ // message about the evaluator exit could have been sent and processed.
+ switch (exitCode) {
+ case 0:
+ this.onCleanExit(processId);
+ break;
+ default:
+ this.onUncleanExit(processId, exitCode);
+ }
+ this.resourceManager.get().onEvaluatorExit(processId);
+ }
+
+ /**
+ * Inform REEF of a cleanly exited process.
+ *
+ * @param processId
+ */
+ private void onCleanExit(final String processId) {
+ this.onResourceStatus(
+ DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+ .setIdentifier(processId)
+ .setState(ReefServiceProtos.State.DONE)
+ .setExitCode(0)
+ .build()
+ );
+ }
+
+ /**
+ * Inform REEF of an unclean process exit
+ *
+ * @param processId
+ * @param exitCode
+ */
+ private void onUncleanExit(final String processId, final int exitCode) {
+ this.onResourceStatus(
+ DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+ .setIdentifier(processId)
+ .setState(ReefServiceProtos.State.FAILED)
+ .setExitCode(exitCode)
+ .build()
+ );
+ }
+
+ private void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto resourceStatus) {
+ LOG.log(Level.INFO, "Sending resource status: {0} ", resourceStatus);
+
+ // Here, we introduce an arbitrary wait. This is to make sure that at the exit of an Evaluator, the last
+ // heartbeat from that Evaluator arrives before this message. This makes sure that the local runtime behaves like
+ // a resource manager with regard to that timing.
+ try {
+ Thread.sleep(100);
+ } catch (final InterruptedException e) {
+ LOG.log(Level.FINEST, "Sleep interrupted. Event will be fired earlier than usual.");
+ }
+ this.resourceStatusHandler.onNext(resourceStatus);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java
new file mode 100644
index 0000000..d55e138
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.process;
+
+import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
+import org.apache.reef.util.OSUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * A runnable class that encapsulates a process.
+ */
+public final class RunnableProcess implements Runnable {
+
+ private static final Logger LOG = Logger.getLogger(RunnableProcess.class.getName());
+
+ private static final long DESTROY_WAIT_TIME = 100;
+
+ /**
+ * Name of the file used for STDERR redirection.
+ */
+ private final String standardErrorFileName;
+ /**
+ * Name of the file used for STDOUT redirection.
+ */
+ private final String standardOutFileName;
+
+ /**
+ * Command to execute.
+ */
+ private final List<String> command;
+ /**
+ * User supplied ID of this process.
+ */
+ private final String id;
+ /**
+ * The working folder in which the process runs. It is also where STDERR and STDOUT files will be deposited.
+ */
+ private final File folder;
+ /**
+ * The coarse-grained lock for state transition.
+ */
+ private final Lock stateLock = new ReentrantLock();
+ private final Condition doneCond = stateLock.newCondition();
+ /**
+ * This will be informed of process start and stop.
+ */
+ private final RunnableProcessObserver processObserver;
+ /**
+ * The process.
+ */
+ private Process process;
+ /**
+ * The state of the process.
+ */
+ private State state = State.INIT; // synchronized on stateLock
+
+ /**
+ * @param command the command to execute.
+ * @param id The ID of the process. This is used to name files and in the logs created by this process.
+ * @param folder The folder in which this will store its stdout and stderr output
+ * @param processObserver will be informed of process state changes.
+ * @param standardOutFileName The name of the file used for redirecting STDOUT
+ * @param standardErrorFileName The name of the file used for redirecting STDERR
+ */
+ public RunnableProcess(final List<String> command,
+ final String id,
+ final File folder,
+ final RunnableProcessObserver processObserver,
+ final String standardOutFileName,
+ final String standardErrorFileName) {
+ this.processObserver = processObserver;
+ this.command = new ArrayList<>(command);
+ this.id = id;
+ this.folder = folder;
+ assert (this.folder.isDirectory());
+ this.folder.mkdirs();
+ this.standardOutFileName = standardOutFileName;
+ this.standardErrorFileName = standardErrorFileName;
+ LOG.log(Level.FINEST, "RunnableProcess ready.");
+ }
+
+ /**
+ * Checks whether a transition from State 'from' to state 'to' is legal
+ *
+ * @param from
+ * @param to
+ * @return true, if the state transition is legal. False otherwise.
+ */
+ private static boolean isLegal(final State from, final State to) {
+ switch (from) {
+ case INIT:
+ switch (to) {
+ case INIT:
+ case RUNNING:
+ case ENDED:
+ return true;
+ default:
+ return false;
+ }
+ case RUNNING:
+ switch (to) {
+ case ENDED:
+ return true;
+ default:
+ return false;
+ }
+ case ENDED:
+ return false;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Runs the configured process.
+ *
+ * @throws java.lang.IllegalStateException if the process is already running or has been running before.
+ */
+ @Override
+ public void run() {
+ this.stateLock.lock();
+ try {
+ if (this.getState() != State.INIT) {
+ throw new IllegalStateException("The RunnableProcess can't be reused");
+ }
+
+ // Setup the stdout and stderr destinations.
+ final File errFile = new File(folder, standardErrorFileName);
+ final File outFile = new File(folder, standardOutFileName);
+
+ // Launch the process
+ try {
+ LOG.log(Level.FINEST, "Launching process \"{0}\"\nSTDERR can be found in {1}\nSTDOUT can be found in {2}",
+ new Object[]{this.id, errFile.getAbsolutePath(), outFile.getAbsolutePath()});
+ this.process = new ProcessBuilder()
+ .command(this.command)
+ .directory(this.folder)
+ .redirectError(errFile)
+ .redirectOutput(outFile)
+ .start();
+ this.setState(State.RUNNING);
+ this.processObserver.onProcessStarted(this.id);
+ } catch (final IOException ex) {
+ LOG.log(Level.SEVERE, "Unable to spawn process \"{0}\" wth command {1}\n Exception:{2}",
+ new Object[]{this.id, this.command, ex});
+ }
+ } finally {
+ this.stateLock.unlock();
+ }
+
+ try {
+ // Wait for its completion
+ final int returnValue = process.waitFor();
+ this.processObserver.onProcessExit(this.id, returnValue);
+ this.stateLock.lock();
+ try {
+ this.setState(State.ENDED);
+ this.doneCond.signalAll();
+ } finally {
+ this.stateLock.unlock();
+ }
+ LOG.log(Level.FINEST, "Process \"{0}\" returned {1}", new Object[]{this.id, returnValue});
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}",
+ new Object[]{this.id, ex});
+ }
+ }
+
+
+ /**
+ * Cancels the running process if it is running.
+ */
+ public void cancel() {
+ this.stateLock.lock();
+ try {
+ if (this.processIsRunning()) {
+ this.process.destroy();
+ this.doneCond.await(DESTROY_WAIT_TIME, TimeUnit.MILLISECONDS);
+ }
+
+ if (this.processIsRunning()) {
+ LOG.log(Level.WARNING, "The child process survived Process.destroy()");
+ if (OSUtils.isLinux()) {
+ LOG.log(Level.WARNING, "Attempting to kill the process via the kill command line");
+ try {
+ final long pid = readPID();
+ OSUtils.kill(pid);
+ } catch (final IOException | InterruptedException e) {
+ LOG.log(Level.SEVERE, "Unable to kill the process.", e);
+ }
+ }
+ }
+
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.SEVERE, "Interrupted while waiting for the process \"{0}\" to complete. Exception: {2}",
+ new Object[]{this.id, ex});
+ } finally {
+ this.stateLock.unlock();
+ }
+ }
+
+ /**
+ * @return the PID stored in the PID file.
+ * @throws IOException if the file can't be read.
+ */
+ private long readPID() throws IOException {
+ final String PIDFileName = this.folder.getAbsolutePath() + "/" + PIDStoreStartHandler.PID_FILE_NAME;
+ try (final BufferedReader r = new BufferedReader(new FileReader(PIDFileName))) {
+ return Long.valueOf(r.readLine());
+ }
+ }
+
+ private boolean processIsRunning() {
+ return this.getState() == State.RUNNING;
+ }
+
+ /**
+ * @return the current State of the process.
+ */
+ private State getState() {
+ return this.state;
+ }
+
+ /**
+ * Sets a new state for the process.
+ *
+ * @param newState
+ * @throws java.lang.IllegalStateException if the new state is illegal.
+ */
+ private void setState(final State newState) {
+ if (!isLegal(this.state, newState)) {
+ throw new IllegalStateException("Transition from " + this.state + " to " + newState + " is illegal");
+ }
+ this.state = newState;
+ }
+
+ /**
+ * The possible states of a process: INIT, RUNNING, ENDED.
+ */
+ private enum State {
+ // After initialization
+ INIT,
+ // The process is running
+ RUNNING,
+ // The process ended
+ ENDED
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessObserver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessObserver.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessObserver.java
new file mode 100644
index 0000000..fe58b74
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcessObserver.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.process;
+
+/**
+ * Observer interface for events fired by RunnableProcess.
+ */
+public interface RunnableProcessObserver {
+ /**
+ * This will be called right after the process is launched.
+ *
+ * @param processId the id of the process that started.
+ */
+ public void onProcessStarted(final String processId);
+
+ /**
+ * This will be called right after the process exited.
+ *
+ * @param exitCode the return code of the process.
+ * @param processId the id of the process that exited.
+ */
+ public void onProcessExit(final String processId, final int exitCode);
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/package-info.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/package-info.java
new file mode 100644
index 0000000..964b6b9
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Infrastructure for managing processes.
+ */
+package org.apache.reef.runtime.local.process;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
new file mode 100644
index 0000000..658747d
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ResourceRequestQueueTest {
+
+ @Test
+ public void testEmptyAfterConstruction() {
+ final ResourceRequestQueue q = new ResourceRequestQueue();
+ Assert.assertFalse("A freshly generated queue should be empty", q.hasOutStandingRequests());
+ }
+
+ @Test
+ public void testNotEmptyAfterInsert() {
+ final ResourceRequestQueue q = new ResourceRequestQueue();
+ q.add(getAlmostSatisfied());
+ Assert.assertTrue("A queue should not be empty after an insert.", q.hasOutStandingRequests());
+ }
+
+ @Test
+ public void testSatisfaction() {
+ final ResourceRequestQueue q = new ResourceRequestQueue();
+ for (int i = 0; i < 1; ++i) {
+ q.add(getAlmostSatisfied());
+ Assert.assertTrue("A queue should not be empty after an insert.", q.hasOutStandingRequests());
+ q.satisfyOne();
+ Assert.assertFalse("The queue should be empty after all requests have been satisfied", q.hasOutStandingRequests());
+ }
+
+ final int nInsert = 10;
+ for (int i = 0; i < nInsert; ++i) {
+ q.add(getAlmostSatisfied());
+ }
+ for (int i = 0; i < nInsert; ++i) {
+ q.satisfyOne();
+ }
+ Assert.assertFalse("The queue should be empty after all requests have been satisfied", q.hasOutStandingRequests());
+ }
+
+ private ResourceRequest getAlmostSatisfied() {
+ return new ResourceRequest(DriverRuntimeProtocol.ResourceRequestProto.newBuilder().setResourceCount(1).setMemorySize(128).build());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
new file mode 100644
index 0000000..3156a02
--- /dev/null
+++ b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+public final class ResourceRequestTest {
+
+ @Test()
+ public void testInitializer() {
+ final ResourceRequest rr = get(1);
+ Assert.assertFalse("A fresh request should not be satisfied.", rr.isSatisfied());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInitializationWithNull() {
+ final ResourceRequest rr2 = new ResourceRequest(null);
+ Assert.fail("Passing null to the ResourceRequest constructor should throw an IllegalArgumentException.");
+ }
+
+
+ @Test
+ public void testSatisfaction() {
+ final int n = 10;
+ final ResourceRequest rr = get(n);
+ for (int i = 0; i < n; ++i) {
+ rr.satisfyOne();
+ }
+ Assert.assertTrue("A satisfied request should tell so", rr.isSatisfied());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testOverSatisfaction() {
+ final ResourceRequest rr = get(1);
+ rr.satisfyOne();
+ rr.satisfyOne();
+ Assert.fail("Satisfying more than the request should throw an IllegalStateException");
+ }
+
+ private ResourceRequest get(final int n) {
+ return new ResourceRequest(DriverRuntimeProtocol.ResourceRequestProto.newBuilder().setResourceCount(n).setMemorySize(128).build());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/pom.xml b/lang/java/reef-runtime-mesos/pom.xml
new file mode 100644
index 0000000..a5aa333
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.11.0-incubating-SNAPSHOT</version>
+ </parent>
+ <name>REEF Runtime for Mesos</name>
+ <artifactId>reef-runtime-mesos</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mesos</groupId>
+ <artifactId>mesos</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+ <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/avro</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc b/lang/java/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
new file mode 100644
index 0000000..ef4a152
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/avro/EvaluatorControl.avsc
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+ {
+ "namespace": "org.apache.reef.runtime.mesos.util",
+ "type": "record",
+ "name": "EvaluatorLaunch",
+ "fields": [
+ { "name": "identifier", "type": "string" },
+ { "name": "command", "type": "string" }
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.runtime.mesos.util",
+ "type": "record",
+ "name": "EvaluatorRelease",
+ "fields": [
+ { "name": "identifier", "type": "string" }
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.runtime.mesos.util",
+ "type": "record",
+ "name": "EvaluatorControl",
+ "fields": [
+ { "name": "evaluator_launch", "type": ["EvaluatorLaunch", "null"] },
+ { "name": "evaluator_release", "type": ["EvaluatorRelease", "null"] }
+ ]
+ }
+]
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
new file mode 100644
index 0000000..cc051d5
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/MesosClasspathProvider.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos;
+
+import net.jcip.annotations.Immutable;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * Access to the classpath according to the REEF file system standard.
+ */
+@Immutable
+public final class MesosClasspathProvider implements RuntimeClasspathProvider {
+ private static final String HADOOP_CONF_DIR = System.getenv("HADOOP_CONF_DIR");
+ private static final String HADOOP_HOME = System.getenv("HADOOP_HOME");
+ private static final String HADOOP_COMMON_HOME = System.getenv("HADOOP_COMMON_HOME");
+ private static final String HADOOP_YARN_HOME = System.getenv("HADOOP_YARN_HOME");
+ private static final String HADOOP_HDFS_HOME = System.getenv("HADOOP_HDFS_HOME");
+ private static final String HADOOP_MAPRED_HOME = System.getenv("HADOOP_MAPRED_HOME");
+
+ // Used when we can't get a classpath from Hadoop
+ private static final String[] LEGACY_CLASSPATH_LIST = new String[]{
+ HADOOP_CONF_DIR,
+ HADOOP_HOME + "/*",
+ HADOOP_HOME + "/lib/*",
+ HADOOP_COMMON_HOME + "/*",
+ HADOOP_COMMON_HOME + "/lib/*",
+ HADOOP_YARN_HOME + "/*",
+ HADOOP_YARN_HOME + "/lib/*",
+ HADOOP_HDFS_HOME + "/*",
+ HADOOP_HDFS_HOME + "/lib/*",
+ HADOOP_MAPRED_HOME + "/*",
+ HADOOP_MAPRED_HOME + "/lib/*",
+ HADOOP_HOME + "/etc/hadoop",
+ HADOOP_HOME + "/share/hadoop/common/*",
+ HADOOP_HOME + "/share/hadoop/common/lib/*",
+ HADOOP_HOME + "/share/hadoop/yarn/*",
+ HADOOP_HOME + "/share/hadoop/yarn/lib/*",
+ HADOOP_HOME + "/share/hadoop/hdfs/*",
+ HADOOP_HOME + "/share/hadoop/hdfs/lib/*",
+ HADOOP_HOME + "/share/hadoop/mapreduce/*",
+ HADOOP_HOME + "/share/hadoop/mapreduce/lib/*"
+ };
+ private final List<String> classPathPrefix;
+ private final List<String> classPathSuffix;
+
+ @Inject
+ MesosClasspathProvider() {
+ this.classPathPrefix = Arrays.asList(LEGACY_CLASSPATH_LIST);
+ this.classPathSuffix = Arrays.asList(LEGACY_CLASSPATH_LIST);
+ }
+
+ @Override
+ public List<String> getDriverClasspathPrefix() {
+ return this.classPathPrefix;
+ }
+
+ @Override
+ public List<String> getDriverClasspathSuffix() {
+ return this.classPathSuffix;
+ }
+
+ @Override
+ public List<String> getEvaluatorClasspathPrefix() {
+ return this.classPathPrefix;
+ }
+
+ @Override
+ public List<String> getEvaluatorClasspathSuffix() {
+ return this.classPathSuffix;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
new file mode 100644
index 0000000..280f2a6
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosClientConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.client.REEF;
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.runtime.common.client.REEFImplementation;
+import org.apache.reef.runtime.common.client.RunningJobImpl;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.mesos.MesosClasspathProvider;
+import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
+import org.apache.reef.runtime.mesos.client.parameters.RootFolder;
+import org.apache.reef.runtime.mesos.util.HDFSConfigurationConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * A ConfigurationModule for the Mesos resource manager
+ */
+@Public
+@ClientSide
+public class MesosClientConfiguration extends ConfigurationModuleBuilder {
+ /**
+ * The folder in which the sub-folders for REEF drivers, one per job, will be created.
+ * If none is given, a folder "REEF_MESOS_RUNTIME" will be created in the local directory.
+ */
+ public static final OptionalParameter<String> ROOT_FOLDER = new OptionalParameter<>();
+
+ /**
+ * The ip address of Mesos Master
+ */
+ public static final RequiredParameter<String> MASTER_IP = new RequiredParameter<>();
+
+ public static final ConfigurationModule CONF = new MesosClientConfiguration()
+ .bindImplementation(REEF.class, REEFImplementation.class)
+ .bindImplementation(RunningJob.class, RunningJobImpl.class)
+ .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+ .bindImplementation(JobSubmissionHandler.class, MesosJobSubmissionHandler.class)
+ .bindNamedParameter(RootFolder.class, ROOT_FOLDER)
+ .bindNamedParameter(MasterIp.class, MASTER_IP)
+ .bindConstructor(Configuration.class, HDFSConfigurationConstructor.class)
+ .bindImplementation(RuntimeClasspathProvider.class, MesosClasspathProvider.class)
+ .build();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
new file mode 100644
index 0000000..d43a855
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos.FileResourceProto;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.mesos.client.parameters.MasterIp;
+import org.apache.reef.runtime.mesos.client.parameters.RootFolder;
+import org.apache.reef.runtime.mesos.driver.MesosDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+/**
+ * The current implementation runs the driver as a local process, similar to reef-runtime-local.
+ * TODO: run the driver on a slave node in the cluster
+ */
+@Private
+@ClientSide
+final class MesosJobSubmissionHandler implements JobSubmissionHandler {
+ public static final String DRIVER_FOLDER_NAME = "driver";
+
+ private final ConfigurationSerializer configurationSerializer;
+ private final ClasspathProvider classpath;
+ private final REEFFileNames fileNames;
+ private final String rootFolderName;
+ private final String masterIp;
+ private final double jvmSlack;
+
+ @Inject
+ MesosJobSubmissionHandler(final @Parameter(RootFolder.class) String rootFolderName,
+ final @Parameter(MasterIp.class) String masterIp,
+ final ConfigurationSerializer configurationSerializer,
+ final REEFFileNames fileNames,
+ final ClasspathProvider classpath,
+ final @Parameter(JVMHeapSlack.class) double jvmSlack) {
+ this.rootFolderName = new File(rootFolderName).getAbsolutePath();
+ this.masterIp = masterIp;
+ this.configurationSerializer = configurationSerializer;
+ this.fileNames = fileNames;
+ this.classpath = classpath;
+ this.jvmSlack = jvmSlack;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+ try {
+ final File jobFolder = new File(new File(this.rootFolderName),
+ "/" + jobSubmissionProto.getIdentifier() + "-" + System.currentTimeMillis() + "/");
+
+ final File driverFolder = new File(jobFolder, DRIVER_FOLDER_NAME);
+ driverFolder.mkdirs();
+
+ final File reefFolder = new File(driverFolder, this.fileNames.getREEFFolderName());
+ reefFolder.mkdirs();
+
+ final File localFolder = new File(reefFolder, this.fileNames.getLocalFolderName());
+ localFolder.mkdirs();
+ for (final FileResourceProto file : jobSubmissionProto.getLocalFileList()) {
+ final Path src = new File(file.getPath()).toPath();
+ final Path dst = new File(driverFolder, this.fileNames.getLocalFolderPath() + "/" + file.getName()).toPath();
+ Files.copy(src, dst);
+ }
+
+ final File globalFolder = new File(reefFolder, this.fileNames.getGlobalFolderName());
+ globalFolder.mkdirs();
+ for (final FileResourceProto file : jobSubmissionProto.getGlobalFileList()) {
+ final Path src = new File(file.getPath()).toPath();
+ final Path dst = new File(driverFolder, this.fileNames.getGlobalFolderPath() + "/" + file.getName()).toPath();
+ Files.copy(src, dst);
+ }
+
+ final Configuration driverConfiguration =
+ Configurations.merge(MesosDriverConfiguration.CONF
+ .set(MesosDriverConfiguration.MESOS_MASTER_IP, this.masterIp)
+ .set(MesosDriverConfiguration.JOB_IDENTIFIER, jobSubmissionProto.getIdentifier())
+ .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionProto.getRemoteId())
+ .set(MesosDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+ .set(MesosDriverConfiguration.SCHEDULER_DRIVER_CAPACITY, 1) // must be 1 as there is 1 scheduler at the same time
+ .build(),
+ this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()));
+ final File runtimeConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
+ this.configurationSerializer.toFile(driverConfiguration, runtimeConfigurationFile);
+
+ final List<String> launchCommand = new JavaLaunchCommandBuilder()
+ .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
+ .setLaunchID(jobSubmissionProto.getIdentifier())
+ .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
+ .setClassPath(this.classpath.getDriverClasspath())
+ .setMemory(jobSubmissionProto.getDriverMemory())
+ .build();
+
+ final File errFile = new File(driverFolder, fileNames.getDriverStderrFileName());
+ final File outFile = new File(driverFolder, fileNames.getDriverStdoutFileName());
+
+ new ProcessBuilder()
+ .command(launchCommand)
+ .directory(driverFolder)
+ .redirectError(errFile)
+ .redirectOutput(outFile)
+ .start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
new file mode 100644
index 0000000..09c2882
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/MasterIp.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The ip address of Mesos Master", short_name = "master_ip")
+public final class MasterIp implements Name<String> {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
new file mode 100644
index 0000000..8cdc116
--- /dev/null
+++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/parameters/RootFolder.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.mesos.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The root folder where logs etc. will be stored.", default_value = "REEF_MESOS_RUNTIME")
+public final class RootFolder implements Name<String> {
+}
\ No newline at end of file