You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:52 UTC

[19/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/ClassPathBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/ClassPathBuilder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/ClassPathBuilder.java
new file mode 100644
index 0000000..fa7780b
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/ClassPathBuilder.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn;
+
+import org.apache.reef.util.HadoopEnvironment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+/**
+ * A helper class to assemble a class path.
+ * <p/>
+ * It uses a TreeSet internally for both a prefix and a suffix of the classpath. This makes sure that duplicate entries
+ * are avoided.
+ */
+@NotThreadSafe
+final class ClassPathBuilder {
+  private final LinkedHashSet<String> prefix = new LinkedHashSet<>();
+  private final LinkedHashSet<String> suffix = new LinkedHashSet<>();
+
+  /**
+   * The oracle that tells us whether a given path could be a YARN configuration path.
+   *
+   * @param path
+   * @return
+   */
+  private static boolean couldBeYarnConfigurationPath(final String path) {
+    return path.contains("conf") ||
+        path.contains("etc") ||
+        path.contains(HadoopEnvironment.HADOOP_CONF_DIR);
+  }
+
+  /**
+   * Adds the given classpath entry. A guess will be made whether it refers to a configuration folder, in which case
+   * it will be added to the prefix. Else, it will be added to the suffix.
+   *
+   * @param classPathEntry
+   */
+  void add(final String classPathEntry) {
+    // Make sure that the cluster configuration is in front of user classes
+    if (couldBeYarnConfigurationPath(classPathEntry)) {
+      this.addToPrefix(classPathEntry);
+    } else {
+      this.addToSuffix(classPathEntry);
+    }
+  }
+
+  /**
+   * Adds the given classPathEntry to the classpath suffix
+   *
+   * @param classPathEntry
+   */
+  void addToSuffix(final String classPathEntry) {
+    this.suffix.add(classPathEntry);
+  }
+
+  /**
+   * Adds the given classPathEntry to the classpath prefix
+   *
+   * @param classPathEntry
+   */
+  void addToPrefix(final String classPathEntry) {
+    this.prefix.add(classPathEntry);
+  }
+
+  /**
+   * Adds all entries given using the <code>add()</code> method.
+   *
+   * @param entries
+   */
+  void addAll(final String... entries) {
+    for (final String classPathEntry : entries) {
+      this.add(classPathEntry);
+    }
+  }
+
+  /**
+   * @return the suffix in an immutable list.
+   */
+  List<String> getSuffixAsImmutableList() {
+    return Collections.unmodifiableList(new ArrayList<>(this.suffix));
+  }
+
+  /**
+   * @return the prefix in an immutable list.
+   */
+  List<String> getPrefixAsImmutableList() {
+    return Collections.unmodifiableList(new ArrayList<>(this.prefix));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/YarnClasspathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/YarnClasspathProvider.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/YarnClasspathProvider.java
new file mode 100644
index 0000000..c5a83a3
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/YarnClasspathProvider.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn;
+
+import net.jcip.annotations.Immutable;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.util.OSUtils;
+
+import javax.inject.Inject;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Access to the classpath according to the REEF file system standard.
+ */
+@Immutable
+public final class YarnClasspathProvider implements RuntimeClasspathProvider {
+  private static final Logger LOG = Logger.getLogger(YarnClasspathProvider.class.getName());
+  private static final Level CLASSPATH_LOG_LEVEL = Level.FINE;
+
+  private static final String YARN_TOO_OLD_MESSAGE = "The version of YARN you are using is too old to support classpath assembly. Reverting to legacy method.";
+  private static final String HADOOP_CONF_DIR = OSUtils.formatVariable("HADOOP_CONF_DIR");
+  private static final String HADOOP_HOME = OSUtils.formatVariable("HADOOP_HOME");
+  private static final String HADOOP_COMMON_HOME = OSUtils.formatVariable("HADOOP_COMMON_HOME");
+  private static final String HADOOP_YARN_HOME = OSUtils.formatVariable("HADOOP_YARN_HOME");
+  private static final String HADOOP_HDFS_HOME = OSUtils.formatVariable("HADOOP_HDFS_HOME");
+  private static final String HADOOP_MAPRED_HOME = OSUtils.formatVariable("HADOOP_MAPRED_HOME");
+
+  // Used when we can't get a classpath from YARN
+  private static final String[] LEGACY_CLASSPATH_LIST = new String[]{
+      HADOOP_CONF_DIR,
+      HADOOP_HOME + "/*",
+      HADOOP_HOME + "/lib/*",
+      HADOOP_COMMON_HOME + "/*",
+      HADOOP_COMMON_HOME + "/lib/*",
+      HADOOP_YARN_HOME + "/*",
+      HADOOP_YARN_HOME + "/lib/*",
+      HADOOP_HDFS_HOME + "/*",
+      HADOOP_HDFS_HOME + "/lib/*",
+      HADOOP_MAPRED_HOME + "/*",
+      HADOOP_MAPRED_HOME + "/lib/*",
+      HADOOP_HOME + "/etc/hadoop",
+      HADOOP_HOME + "/share/hadoop/common/*",
+      HADOOP_HOME + "/share/hadoop/common/lib/*",
+      HADOOP_HOME + "/share/hadoop/yarn/*",
+      HADOOP_HOME + "/share/hadoop/yarn/lib/*",
+      HADOOP_HOME + "/share/hadoop/hdfs/*",
+      HADOOP_HOME + "/share/hadoop/hdfs/lib/*",
+      HADOOP_HOME + "/share/hadoop/mapreduce/*",
+      HADOOP_HOME + "/share/hadoop/mapreduce/lib/*"
+  };
+  private final List<String> classPathPrefix;
+  private final List<String> classPathSuffix;
+
+  @Inject
+  YarnClasspathProvider(final YarnConfiguration yarnConfiguration) {
+    boolean needsLegacyClasspath = false; // will be set to true below whenever we encounter issues with the YARN Configuration
+    final ClassPathBuilder builder = new ClassPathBuilder();
+
+    try {
+      // Add the classpath actually configured on this cluster
+      final String[] yarnClassPath = yarnConfiguration.getTrimmedStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
+      if (null == yarnClassPath || yarnClassPath.length == 0) {
+        needsLegacyClasspath = true;
+      } else {
+        builder.addAll(yarnClassPath);
+      }
+      final String[] yarnDefaultClassPath = YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH;
+      if (null == yarnDefaultClassPath || yarnDefaultClassPath.length == 0) {
+        LOG.log(Level.SEVERE, "YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH is empty. This indicates a broken cluster configuration");
+        needsLegacyClasspath = true;
+      } else {
+        builder.addAll(yarnDefaultClassPath);
+      }
+    } catch (final NoSuchFieldError e) {
+      // This means that one of the static fields above aren't actually in YarnConfiguration.
+      // The reason for that is most likely that we encounter a really old version of YARN.
+      needsLegacyClasspath = true;
+      LOG.log(Level.SEVERE, YARN_TOO_OLD_MESSAGE);
+    }
+
+    if (needsLegacyClasspath) {
+      builder.addAll(LEGACY_CLASSPATH_LIST);
+    }
+
+    this.classPathPrefix = builder.getPrefixAsImmutableList();
+    this.classPathSuffix = builder.getSuffixAsImmutableList();
+    this.logClasspath();
+  }
+
+  @Override
+  public List<String> getDriverClasspathPrefix() {
+    return this.classPathPrefix;
+  }
+
+  @Override
+  public List<String> getDriverClasspathSuffix() {
+    return this.classPathSuffix;
+  }
+
+  @Override
+  public List<String> getEvaluatorClasspathPrefix() {
+    return this.classPathPrefix;
+  }
+
+  @Override
+  public List<String> getEvaluatorClasspathSuffix() {
+    return this.classPathSuffix;
+  }
+
+
+  private void logClasspath() {
+    if (LOG.isLoggable(CLASSPATH_LOG_LEVEL)) {
+      final StringBuilder message = new StringBuilder("Classpath:\n\t");
+      message.append(StringUtils.join(classPathPrefix, "\n\t"));
+      message.append("\n--------------------------------\n\t");
+      message.append(StringUtils.join(classPathSuffix, "\n\t"));
+      LOG.log(CLASSPATH_LOG_LEVEL, message.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
new file mode 100644
index 0000000..c893dde
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.client.REEF;
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.runtime.common.client.REEFImplementation;
+import org.apache.reef.runtime.common.client.RunningJobImpl;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.client.parameters.JobPriority;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.util.logging.LoggingSetup;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * A ConfigurationModule for the YARN resourcemanager.
+ */
+@Public
+@ClientSide
+public class YarnClientConfiguration extends ConfigurationModuleBuilder {
+  static {
+    LoggingSetup.setupCommonsLogging();
+  }
+
+  public static final OptionalParameter<String> YARN_QUEUE_NAME = new OptionalParameter<>();
+  public static final OptionalParameter<Integer> YARN_PRIORITY = new OptionalParameter<>();
+
+  public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
+
+  public static final ConfigurationModule CONF = new YarnClientConfiguration()
+      // Bind the common resourcemanager
+      .bindImplementation(REEF.class, REEFImplementation.class)
+      .bindImplementation(RunningJob.class, RunningJobImpl.class)
+          // Bind the message codec for REEF.
+      .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+          // Bind YARN
+      .bindImplementation(JobSubmissionHandler.class, YarnJobSubmissionHandler.class)
+          // Bind the parameters given by the user
+      .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
+      .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
+      .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+      .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+          // Bind external constructors. Taken from  YarnExternalConstructors.registerClientConstructors
+      .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
+      .build();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
new file mode 100644
index 0000000..fd2bb69
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.JobJarMaker;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
+import org.apache.reef.runtime.yarn.util.YarnTypes;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.tang.types.NamedParameterNode;
+import org.apache.reef.tang.util.ReflectionUtilities;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Private
+@ClientSide
+final class YarnJobSubmissionHandler implements JobSubmissionHandler {
+
+  private static final Logger LOG = Logger.getLogger(YarnJobSubmissionHandler.class.getName());
+
+  private final YarnConfiguration yarnConfiguration;
+  private final YarnClient yarnClient;
+  private final JobJarMaker jobJarMaker;
+  private final REEFFileNames filenames;
+  private final ClasspathProvider classpath;
+  private final FileSystem fileSystem;
+  private final ConfigurationSerializer configurationSerializer;
+  private final double jvmSlack;
+
+  @Inject
+  YarnJobSubmissionHandler(
+      final YarnConfiguration yarnConfiguration,
+      final JobJarMaker jobJarMaker,
+      final REEFFileNames filenames,
+      final ClasspathProvider classpath,
+      final ConfigurationSerializer configurationSerializer,
+      final @Parameter(JVMHeapSlack.class) double jvmSlack) throws IOException {
+
+    this.yarnConfiguration = yarnConfiguration;
+    this.jobJarMaker = jobJarMaker;
+    this.filenames = filenames;
+    this.classpath = classpath;
+    this.configurationSerializer = configurationSerializer;
+    this.jvmSlack = jvmSlack;
+
+    this.fileSystem = FileSystem.get(yarnConfiguration);
+
+    this.yarnClient = YarnClient.createYarnClient();
+    this.yarnClient.init(this.yarnConfiguration);
+    this.yarnClient.start();
+  }
+
+  @Override
+  public void close() {
+    this.yarnClient.stop();
+  }
+
+  @Override
+  public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+
+    LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionProto.getIdentifier());
+
+    try {
+
+      LOG.log(Level.FINE, "Requesting Application ID from YARN.");
+
+      final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication();
+      final GetNewApplicationResponse applicationResponse = yarnClientApplication.getNewApplicationResponse();
+
+      final ApplicationSubmissionContext applicationSubmissionContext =
+          yarnClientApplication.getApplicationSubmissionContext();
+
+      final ApplicationId applicationId = applicationSubmissionContext.getApplicationId();
+
+      LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
+
+      // set the application name
+      applicationSubmissionContext.setApplicationName(
+          "reef-job-" + jobSubmissionProto.getIdentifier());
+
+      LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
+
+      final Path submissionFolder = new Path(
+          "/tmp/" + this.filenames.getJobFolderPrefix() + applicationId.getId() + "/");
+
+      final Configuration driverConfiguration =
+          makeDriverConfiguration(jobSubmissionProto, submissionFolder);
+
+      final File jobSubmissionFile =
+          this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration);
+
+      final Path uploadedJobJarPath = this.uploadToJobFolder(jobSubmissionFile, submissionFolder);
+
+      final Map<String, LocalResource> resources = new HashMap<>(1);
+      resources.put(this.filenames.getREEFFolderName(),
+          this.makeLocalResourceForJarFile(uploadedJobJarPath));
+
+      // SET MEMORY RESOURCE
+      final int amMemory = getMemory(
+          jobSubmissionProto, applicationResponse.getMaximumResourceCapability().getMemory());
+      applicationSubmissionContext.setResource(Resource.newInstance(amMemory, 1));
+
+      // SET EXEC COMMAND
+      final List<String> launchCommand = new JavaLaunchCommandBuilder()
+          .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
+          .setLaunchID(jobSubmissionProto.getIdentifier())
+          .setConfigurationFileName(this.filenames.getDriverConfigurationPath())
+          .setClassPath(this.classpath.getDriverClasspath())
+          .setMemory(amMemory)
+          .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName())
+          .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName())
+          .build();
+
+      applicationSubmissionContext.setAMContainerSpec(
+          YarnTypes.getContainerLaunchContext(launchCommand, resources));
+
+      applicationSubmissionContext.setPriority(getPriority(jobSubmissionProto));
+
+      // Set the queue to which this application is to be submitted in the RM
+      applicationSubmissionContext.setQueue(getQueue(jobSubmissionProto, "default"));
+      LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", applicationId);
+
+      if (LOG.isLoggable(Level.FINEST)) {
+        LOG.log(Level.FINEST, "REEF app command: {0}", StringUtils.join(launchCommand, ' '));
+      }
+
+      // TODO: this is currently being developed on a hacked 2.4.0 bits, should be 2.4.1
+      final String minVersionKeepContainerOptionAvailable = "2.4.0";
+
+      // when supported, set KeepContainersAcrossApplicationAttempts to be true
+      // so that when driver (AM) crashes, evaluators will still be running and we can recover later.
+      if (YarnTypes.isAtOrAfterVersion(minVersionKeepContainerOptionAvailable)) {
+        LOG.log(
+            Level.FINE,
+            "Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported, will set it to true.",
+            minVersionKeepContainerOptionAvailable);
+
+        applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
+      }
+
+      this.yarnClient.submitApplication(applicationSubmissionContext);
+
+    } catch (final YarnException | IOException e) {
+      throw new RuntimeException("Unable to submit Driver to YARN.", e);
+    }
+  }
+
+  /**
+   * Assembles the Driver configuration.
+   */
+  private Configuration makeDriverConfiguration(
+      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+      final Path jobFolderPath) throws IOException {
+    Configuration config = this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration());
+    final String userBoundJobSubmissionDirectory = config.getNamedParameter((NamedParameterNode<?>) config.getClassHierarchy().getNode(ReflectionUtilities.getFullName(DriverJobSubmissionDirectory.class)));
+    LOG.log(Level.FINE, "user bound job submission Directory: " + userBoundJobSubmissionDirectory);
+    final String finalJobFolderPath =
+        (userBoundJobSubmissionDirectory == null || userBoundJobSubmissionDirectory.isEmpty())
+            ? jobFolderPath.toString() : userBoundJobSubmissionDirectory;
+    return Configurations.merge(
+        YarnDriverConfiguration.CONF
+            .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, finalJobFolderPath)
+            .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobSubmissionProto.getIdentifier())
+            .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionProto.getRemoteId())
+            .set(YarnDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+            .build(),
+        this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()));
+  }
+
+  private final Path uploadToJobFolder(final File file, final Path jobFolder) throws IOException {
+    final Path source = new Path(file.getAbsolutePath());
+    final Path destination = new Path(jobFolder, file.getName());
+    LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination});
+    this.fileSystem.copyFromLocalFile(false, true, source, destination);
+    return destination;
+  }
+
+  private Priority getPriority(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+    return Priority.newInstance(
+        jobSubmissionProto.hasPriority() ? jobSubmissionProto.getPriority() : 0);
+  }
+
+  /**
+   * Extract the queue name from the jobSubmissionProto or return default if none is set.
+   * <p/>
+   * TODO: Revisit this. We also have a named parameter for the queue in YarnClientConfiguration.
+   */
+  private final String getQueue(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+                                final String defaultQueue) {
+    return jobSubmissionProto.hasQueue() && !jobSubmissionProto.getQueue().isEmpty() ?
+        jobSubmissionProto.getQueue() : defaultQueue;
+  }
+
+  /**
+   * Extract the desired driver memory from jobSubmissionProto.
+   * <p/>
+   * returns maxMemory if that desired amount is more than maxMemory
+   */
+  private int getMemory(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+                        final int maxMemory) {
+    final int amMemory;
+    final int requestedMemory = jobSubmissionProto.getDriverMemory();
+    if (requestedMemory <= maxMemory) {
+      amMemory = requestedMemory;
+    } else {
+      LOG.log(Level.WARNING,
+          "Requested {0}MB of memory for the driver. " +
+              "The max on this YARN installation is {1}. " +
+              "Using {1} as the memory for the driver.",
+          new Object[]{requestedMemory, maxMemory});
+      amMemory = maxMemory;
+    }
+    return amMemory;
+  }
+
+  /**
+   * Creates a LocalResource instance for the JAR file referenced by the given Path
+   */
+  private LocalResource makeLocalResourceForJarFile(final Path path) throws IOException {
+    final LocalResource localResource = Records.newRecord(LocalResource.class);
+    final FileStatus status = FileContext.getFileContext(fileSystem.getUri()).getFileStatus(path);
+    localResource.setType(LocalResourceType.ARCHIVE);
+    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+    localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
+    localResource.setTimestamp(status.getModificationTime());
+    localResource.setSize(status.getLen());
+    return localResource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobPriority.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobPriority.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobPriority.java
new file mode 100644
index 0000000..a14ad98
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobPriority.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The prioroty of the submitted job.
+ */
+@NamedParameter(doc = "The job priority.", default_value = "0", short_name = "yarn_priority")
+public final class JobPriority implements Name<Integer> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobQueue.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobQueue.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobQueue.java
new file mode 100644
index 0000000..6017a55
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobQueue.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The queue to submit a job to.
+ */
+@NamedParameter(doc = "The job queue.", default_value = "default", short_name = "yarn_queue")
+public final class JobQueue implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ApplicationMasterRegistration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ApplicationMasterRegistration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ApplicationMasterRegistration.java
new file mode 100644
index 0000000..63194e7
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ApplicationMasterRegistration.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+
+/**
+ * Helper class that holds on to the AM registration.
+ */
+final class ApplicationMasterRegistration {
+
+  private Optional<RegisterApplicationMasterResponse> registration = Optional.empty();
+
+  @Inject
+  ApplicationMasterRegistration() {
+  }
+
+  /**
+   * @return the registered registration.
+   */
+  synchronized RegisterApplicationMasterResponse getRegistration() {
+    return registration.get();
+  }
+
+  /**
+   * Set the registration information. This is a set-once field.
+   *
+   * @param registration
+   */
+  synchronized void setRegistration(final RegisterApplicationMasterResponse registration) {
+    if (this.isPresent()) {
+      throw new RuntimeException("Trying to re-register the AM");
+    }
+    this.registration = Optional.of(registration);
+  }
+
+  /**
+   * @return true, if a registration was set.
+   */
+  synchronized boolean isPresent() {
+    return this.registration.isPresent();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ContainerRequestCounter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ContainerRequestCounter.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ContainerRequestCounter.java
new file mode 100644
index 0000000..59860e2
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ContainerRequestCounter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Used to keep track of resource requests.
+ */
+@Private
+@DriverSide
+final class ContainerRequestCounter {
+  private static final Logger LOG = Logger.getLogger(ContainerRequestCounter.class.getName());
+
+  private int counter = 0;
+
+  @Inject
+  ContainerRequestCounter() {
+    LOG.log(Level.FINEST, "Instantiated 'ContainerRequestCounter'");
+  }
+
+  /**
+   * Increment the counter by the given amount.
+   *
+   * @param number
+   */
+  synchronized void incrementBy(final int number) {
+    this.counter += number;
+  }
+
+  /**
+   * @return the current value of the counter.
+   */
+  synchronized int get() {
+    return this.counter;
+  }
+
+  /**
+   * Decrement the counter by 1.
+   */
+  synchronized void decrement() {
+    if (this.counter <= 0) {
+      LOG.log(Level.WARNING, "requestedContainerCount cannot go below 0");
+      this.counter = 0;
+    } else {
+      this.counter -= 1;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/Containers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/Containers.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/Containers.java
new file mode 100644
index 0000000..b3f4ac1
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/Containers.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Helper class that manages the set of Containers we know about.
+ */
+@Private
+@DriverSide
+final class Containers {
+  private final Map<String, Container> containers = new ConcurrentHashMap<>();
+
+  @Inject
+  Containers() {
+  }
+
+  /**
+   * @param containerID
+   * @return the container for the given id.
+   * @throws java.lang.RuntimeException when the container is unknown.
+   */
+  synchronized Container get(final String containerID) {
+    final Container result = this.containers.get(containerID);
+    if (null == result) {
+      throw new RuntimeException("Requesting an unknown container: " + containerID);
+    }
+    return result;
+  }
+
+  /**
+   * Registers the given container
+   *
+   * @param container
+   * @throws java.lang.RuntimeException if a container with the same ID had been registered before.
+   */
+  synchronized void add(final Container container) {
+    final String containerId = container.getId().toString();
+    if (this.hasContainer(containerId)) {
+      throw new RuntimeException("Trying to add a Container that is already known: " + containerId);
+    }
+    this.containers.put(containerId, container);
+  }
+
+  /**
+   * Removes the container with the given ID.
+   *
+   * @param containerId
+   * @return the container that was registered before.
+   * @throws java.lang.RuntimeException if no such container existed.
+   */
+  synchronized Container removeAndGet(final String containerId) {
+    final Container result = this.containers.remove(containerId);
+    if (null == result) {
+      throw new RuntimeException("Unknown container to remove: " + containerId);
+    }
+    return result;
+  }
+
+  /**
+   * @param containerId
+   * @return true, if a container with this ID is known.
+   */
+  synchronized boolean hasContainer(final String containerId) {
+    return this.containers.containsKey(containerId);
+  }
+
+
+  /**
+   * @param containerId
+   * @return the Container stored under this containerId or an empty Optional.
+   */
+  synchronized Optional<Container> getOptional(final String containerId) {
+    return Optional.ofNullable(this.containers.get(containerId));
+  }
+
+  /**
+   * @return an Iterable of all the known container Ids.
+   */
+  synchronized Iterable<String> getContainerIds() {
+    return new ArrayList(this.containers.keySet());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/DefaultTrackingURLProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/DefaultTrackingURLProvider.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/DefaultTrackingURLProvider.java
new file mode 100644
index 0000000..c11b2b8
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/DefaultTrackingURLProvider.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import javax.inject.Inject;
+
+final class DefaultTrackingURLProvider implements TrackingURLProvider {
+
+  @Inject
+  DefaultTrackingURLProvider() {
+  }
+
+  @Override
+  public String getTrackingUrl() {
+    return "";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
new file mode 100644
index 0000000..3a2ff5c
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.files.JobJarMaker;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages the global files on the driver and can produce the needed FileResources for container submissions.
+ */
+@DriverSide
+final class EvaluatorSetupHelper {
+
+  private static final Logger LOG = Logger.getLogger(EvaluatorSetupHelper.class.getName());
+
+  private final REEFFileNames fileNames;
+  private final ConfigurationSerializer configurationSerializer;
+  private final TempFileCreator tempFileCreator;
+  private final UploaderToJobFolder uploader;
+  private final GlobalJarUploader globalJarUploader;
+  private final boolean deleteTempFiles;
+
+  @Inject
+  EvaluatorSetupHelper(
+      final REEFFileNames fileNames,
+      final ConfigurationSerializer configurationSerializer,
+      final TempFileCreator tempFileCreator,
+      final @Parameter(DeleteTempFiles.class) boolean deleteTempFiles,
+      final UploaderToJobFolder uploader,
+      final GlobalJarUploader globalJarUploader) throws IOException {
+    this.tempFileCreator = tempFileCreator;
+    this.deleteTempFiles = deleteTempFiles;
+    this.globalJarUploader = globalJarUploader;
+
+    this.fileNames = fileNames;
+    this.configurationSerializer = configurationSerializer;
+    this.uploader = uploader;
+  }
+
+  /**
+   * @return the map to be used in formulating the evaluator launch submission.
+   */
+  Map<String, LocalResource> getGlobalResources() {
+    try {
+      return this.globalJarUploader.call();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to upload the global JAR file to the job folder.", e);
+    }
+  }
+
+
+  /**
+   * Sets up the LocalResources for a new Evaluator.
+   *
+   * @param resourceLaunchProto
+   * @return
+   * @throws IOException
+   */
+  Map<String, LocalResource> getResources(
+      final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto)
+      throws IOException {
+
+    final Map<String, LocalResource> result = new HashMap<>();
+    result.putAll(getGlobalResources());
+
+    final File localStagingFolder = this.tempFileCreator.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix());
+
+    // Write the configuration
+    final File configurationFile = new File(localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
+    this.configurationSerializer.toFile(makeEvaluatorConfiguration(resourceLaunchProto), configurationFile);
+
+    // Copy files to the staging folder
+    JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder);
+
+    // Make a JAR file out of it
+    final File localFile = tempFileCreator.createTempFile(
+        this.fileNames.getEvaluatorFolderPrefix(), this.fileNames.getJarFileSuffix());
+    new JARFileMaker(localFile).addChildren(localStagingFolder).close();
+
+    // Upload the JAR to the job folder
+    final Path pathToEvaluatorJar = this.uploader.uploadToJobFolder(localFile);
+    result.put(this.fileNames.getLocalFolderPath(), this.uploader.makeLocalResourceForJarFile(pathToEvaluatorJar));
+
+    if (this.deleteTempFiles) {
+      LOG.log(Level.FINE, "Marking [{0}] for deletion at the exit of this JVM and deleting [{1}]",
+          new Object[]{localFile.getAbsolutePath(), localStagingFolder.getAbsolutePath()});
+      localFile.deleteOnExit();
+      localStagingFolder.delete();
+    } else {
+      LOG.log(Level.FINE, "The evaluator staging folder will be kept at [{0}], the JAR at [{1}]",
+          new Object[]{localFile.getAbsolutePath(), localStagingFolder.getAbsolutePath()});
+    }
+    return result;
+  }
+
+  /**
+   * Assembles the configuration for an Evaluator.
+   *
+   * @param resourceLaunchProto
+   * @return
+   * @throws IOException
+   */
+
+  private Configuration makeEvaluatorConfiguration(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto)
+      throws IOException {
+    return Tang.Factory.getTang()
+        .newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf()))
+        .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
new file mode 100644
index 0000000..c6ee91e
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * Utility class that creates the JAR file with the global files on the driver and then uploads it to the job folder on
+ * (H)DFS.
+ */
+final class GlobalJarUploader implements Callable<Map<String, LocalResource>> {
+
+  /**
+   * Used for the file system constants.
+   */
+  private final REEFFileNames fileNames;
+  /**
+   * This will hold the actuall map to be used as the "global" resources when submitting Evaluators.
+   */
+  private final Map<String, LocalResource> globalResources = new HashMap<>(1);
+  /**
+   * Utility to actually perform the update.
+   */
+  private final UploaderToJobFolder uploader;
+  /**
+   * True, if globalResources contains the valid information which is cached after the first call to call().
+   */
+  private boolean isDone;
+
+  @Inject
+  GlobalJarUploader(final REEFFileNames fileNames,
+                    final UploaderToJobFolder uploader) {
+    this.fileNames = fileNames;
+    this.uploader = uploader;
+  }
+
+  /**
+   * Creates the JAR file with the global files on the driver and then uploads it to the job folder on
+   * (H)DFS.
+   *
+   * @return the map to be used as the "global" resources when submitting Evaluators.
+   * @throws IOException if the creation of the JAR or the upload fails
+   */
+  @Override
+  public synchronized Map<String, LocalResource> call() throws IOException {
+    if (!this.isDone) {
+      final Path pathToGlobalJar = this.uploader.uploadToJobFolder(makeGlobalJar());
+      globalResources.put(this.fileNames.getGlobalFolderPath(),
+          this.uploader.makeLocalResourceForJarFile(pathToGlobalJar));
+      this.isDone = true;
+    }
+    return this.globalResources;
+  }
+
+  /**
+   * Creates the JAR file for upload.
+   *
+   * @return
+   * @throws IOException
+   */
+  private File makeGlobalJar() throws IOException {
+    final File jarFile = new File(this.fileNames.getGlobalFolderName() + this.fileNames.getJarFileSuffix());
+    new JARFileMaker(jarFile).addChildren(this.fileNames.getGlobalFolder()).close();
+    return jarFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
new file mode 100644
index 0000000..ec43666
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Helper that represents the REEF layer to the YARN runtime.
+ */
+// This is a great place to add a thread boundary, should that need arise.
+@Private
+final class REEFEventHandlers implements AutoCloseable {
+  private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> resourceAllocationHandler;
+  private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler;
+  private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandler;
+  private final EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorProtoEventHandler;
+
+  @Inject
+  REEFEventHandlers(final @Parameter(RuntimeParameters.NodeDescriptorHandler.class) EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorProtoEventHandler,
+                    final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusProtoEventHandler,
+                    final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> resourceAllocationHandler,
+                    final @Parameter(RuntimeParameters.ResourceStatusHandler.class) EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler) {
+    this.resourceAllocationHandler = resourceAllocationHandler;
+    this.resourceStatusHandler = resourceStatusHandler;
+    this.runtimeStatusHandler = runtimeStatusProtoEventHandler;
+    this.nodeDescriptorProtoEventHandler = nodeDescriptorProtoEventHandler;
+  }
+
+  /**
+   * Inform reef of a node.
+   *
+   * @param nodeDescriptorProto
+   */
+  void onNodeDescriptor(final DriverRuntimeProtocol.NodeDescriptorProto nodeDescriptorProto) {
+    this.nodeDescriptorProtoEventHandler.onNext(nodeDescriptorProto);
+  }
+
+  /**
+   * Update REEF's view on the runtime status.
+   *
+   * @param runtimeStatusProto
+   */
+  void onRuntimeStatus(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
+    this.runtimeStatusHandler.onNext(runtimeStatusProto);
+  }
+
+  /**
+   * Inform REEF of a fresh resource allocation.
+   *
+   * @param resourceAllocationProto
+   */
+  void onResourceAllocation(final DriverRuntimeProtocol.ResourceAllocationProto resourceAllocationProto) {
+    this.resourceAllocationHandler.onNext(resourceAllocationProto);
+  }
+
+  /**
+   * Update REEF on a change to the status of a resource.
+   *
+   * @param resourceStatusProto
+   */
+  void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
+    this.resourceStatusHandler.onNext(resourceStatusProto);
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Empty, but here for a future where we need to close a threadpool
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/TrackingURLProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/TrackingURLProvider.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/TrackingURLProvider.java
new file mode 100644
index 0000000..bbb72a6
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/TrackingURLProvider.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * Implement this interface to set the tracking URL reported to YARN.
+ */
+@DefaultImplementation(DefaultTrackingURLProvider.class)
+public interface TrackingURLProvider {
+  public String getTrackingUrl();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
new file mode 100644
index 0000000..afc1d9c
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Uploads files to the current job folder.
+ */
+final class UploaderToJobFolder {
+  private static final Logger LOG = Logger.getLogger(UploaderToJobFolder.class.getName());
+
+  /**
+   * The path on (H)DFS which is used as the job's folder.
+   */
+  private final String jobSubmissionDirectory;
+  /**
+   * The FileSystem instance to use for fs operations.
+   */
+  private final FileSystem fileSystem;
+
+  @Inject
+  UploaderToJobFolder(final @Parameter(JobSubmissionDirectory.class) String jobSubmissionDirectory,
+                      final YarnConfiguration yarnConfiguration) throws IOException {
+    this.jobSubmissionDirectory = jobSubmissionDirectory;
+    this.fileSystem = FileSystem.get(yarnConfiguration);
+  }
+
+  /**
+   * Uploads the given file to the job folder on (H)DFS.
+   *
+   * @param file
+   * @return
+   * @throws java.io.IOException
+   */
+  Path uploadToJobFolder(final File file) throws IOException {
+    final Path source = new Path(file.getAbsolutePath());
+    final Path destination = new Path(this.jobSubmissionDirectory + "/" + file.getName());
+    LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination});
+    this.fileSystem.copyFromLocalFile(false, true, source, destination);
+    return destination;
+  }
+
+  /**
+   * Creates a LocalResource instance for the JAR file referenced by the given Path
+   *
+   * @param path
+   * @return
+   * @throws IOException
+   */
+  LocalResource makeLocalResourceForJarFile(final Path path) throws IOException {
+    final LocalResource localResource = Records.newRecord(LocalResource.class);
+    final FileStatus status = FileContext.getFileContext(this.fileSystem.getUri()).getFileStatus(path);
+    localResource.setType(LocalResourceType.ARCHIVE);
+    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+    localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
+    localResource.setTimestamp(status.getModificationTime());
+    localResource.setSize(status.getLen());
+    return localResource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
new file mode 100644
index 0000000..073cf8c
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.LaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.util.YarnTypes;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Resource launch handler for YARN.
+ */
+public final class YARNResourceLaunchHandler implements ResourceLaunchHandler {
+
+  private static final Logger LOG = Logger.getLogger(YARNResourceLaunchHandler.class.getName());
+
+  private final Containers containers;
+  private final InjectionFuture<YarnContainerManager> yarnContainerManager;
+  private final EvaluatorSetupHelper evaluatorSetupHelper;
+  private final REEFFileNames filenames;
+  private final ClasspathProvider classpath;
+  private final double jvmHeapFactor;
+
+  @Inject
+  YARNResourceLaunchHandler(final Containers containers,
+                            final InjectionFuture<YarnContainerManager> yarnContainerManager,
+                            final EvaluatorSetupHelper evaluatorSetupHelper,
+                            final REEFFileNames filenames,
+                            final ClasspathProvider classpath,
+                            final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) {
+    this.jvmHeapFactor = 1.0 - jvmHeapSlack;
+    LOG.log(Level.FINEST, "Instantiating 'YARNResourceLaunchHandler'");
+    this.containers = containers;
+    this.yarnContainerManager = yarnContainerManager;
+    this.evaluatorSetupHelper = evaluatorSetupHelper;
+    this.filenames = filenames;
+    this.classpath = classpath;
+    LOG.log(Level.FINE, "Instantiated 'YARNResourceLaunchHandler'");
+  }
+
+  @Override
+  public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) {
+    try {
+
+      final String containerId = resourceLaunchProto.getIdentifier();
+      LOG.log(Level.FINEST, "TIME: Start ResourceLaunchProto {0}", containerId);
+      final Container container = this.containers.get(containerId);
+      LOG.log(Level.FINEST, "Setting up container launch container for id={0}", container.getId());
+      final Map<String, LocalResource> localResources =
+          this.evaluatorSetupHelper.getResources(resourceLaunchProto);
+
+      final LaunchCommandBuilder commandBuilder;
+      switch (resourceLaunchProto.getType()) {
+        case JVM:
+          commandBuilder = new JavaLaunchCommandBuilder()
+              .setClassPath(this.classpath.getEvaluatorClasspath());
+          break;
+        case CLR:
+          commandBuilder = new CLRLaunchCommandBuilder();
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Unsupported container type: " + resourceLaunchProto.getType());
+      }
+
+      final List<String> command = commandBuilder
+          .setErrorHandlerRID(resourceLaunchProto.getRemoteId())
+          .setLaunchID(resourceLaunchProto.getIdentifier())
+          .setConfigurationFileName(this.filenames.getEvaluatorConfigurationPath())
+          .setMemory((int) (this.jvmHeapFactor * container.getResource().getMemory()))
+          .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getEvaluatorStderrFileName())
+          .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getEvaluatorStdoutFileName())
+          .build();
+
+      if (LOG.isLoggable(Level.FINEST)) {
+        LOG.log(Level.FINEST,
+            "TIME: Run ResourceLaunchProto {0} command: `{1}` with resources: `{2}`",
+            new Object[]{containerId, StringUtils.join(command, ' '), localResources});
+      }
+
+      final ContainerLaunchContext ctx = YarnTypes.getContainerLaunchContext(command, localResources);
+      this.yarnContainerManager.get().submit(container, ctx);
+
+      LOG.log(Level.FINEST, "TIME: End ResourceLaunchProto {0}", containerId);
+
+    } catch (final Throwable e) {
+      LOG.log(Level.WARNING, "Error handling resource launch message: " + resourceLaunchProto, e);
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
new file mode 100644
index 0000000..dda9fb3
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.tang.InjectionFuture;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * ResourceReleaseHandler for YARN.
+ */
+public final class YARNResourceReleaseHandler implements ResourceReleaseHandler {
+
+  private static final Logger LOG = Logger.getLogger(YARNResourceReleaseHandler.class.getName());
+
+  private final InjectionFuture<YarnContainerManager> yarnContainerManager;
+
+  @Inject
+  YARNResourceReleaseHandler(final InjectionFuture<YarnContainerManager> yarnContainerManager) {
+    this.yarnContainerManager = yarnContainerManager;
+    LOG.log(Level.FINE, "Instantiated 'YARNResourceReleaseHandler'");
+  }
+
+  @Override
+  public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto resourceReleaseProto) {
+    final String containerId = resourceReleaseProto.getIdentifier();
+    LOG.log(Level.FINEST, "Releasing container {0}", containerId);
+    this.yarnContainerManager.get().release(containerId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
new file mode 100644
index 0000000..c8ab5b2
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+import javax.inject.Inject;
+
+/**
+ * Handler of RuntimeStart for the YARN Runtime.
+ */
+public final class YARNRuntimeStartHandler implements EventHandler<RuntimeStart> {
+
+  private final YarnContainerManager yarnContainerManager;
+
+  @Inject
+  public YARNRuntimeStartHandler(final YarnContainerManager yarnContainerManager) {
+    this.yarnContainerManager = yarnContainerManager;
+  }
+
+  @Override
+  public void onNext(final RuntimeStart runtimeStart) {
+    this.yarnContainerManager.onStart();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
new file mode 100644
index 0000000..b79c906
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+
+/**
+ * Shuts down the YARN resource manager.
+ */
+public final class YARNRuntimeStopHandler implements EventHandler<RuntimeStop> {
+
+  private final YarnContainerManager yarnContainerManager;
+
+  @Inject
+  YARNRuntimeStopHandler(final YarnContainerManager yarnContainerManager) {
+    this.yarnContainerManager = yarnContainerManager;
+  }
+
+  @Override
+  public void onNext(final RuntimeStop runtimeStop) {
+    this.yarnContainerManager.onStop();
+  }
+}