You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/04/14 22:16:45 UTC

incubator-reef git commit: [REEF-226] A REEF.NET client for YARN

Repository: incubator-reef
Updated Branches:
  refs/heads/master c85d45a27 -> 1aae0ea41


[REEF-226] A REEF.NET client for YARN

  * Refactored YarnJobSubmissionHandler to make the code reusable for
    the command line launcher for .NET Drivers
  * Added YarnJobsubmissionClient to be launched by the .NET API
  * Updated HelloREEF to run on YARN as well. Just run
    `Org.Apache.REEF.Examples.HelloREEF.exe yarn` to run it.

JIRA:
  [REEF-226](https://issues.apache.org/jira/browse/REEF-226)

Pull Request:
  This closes #144

Author:    Markus Weimer <we...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/1aae0ea4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/1aae0ea4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/1aae0ea4

Branch: refs/heads/master
Commit: 1aae0ea41ab3a9d770f6794625435eecf8c7b2e7
Parents: c85d45a
Author: Markus Weimer <we...@apache.org>
Authored: Thu Apr 9 16:10:04 2015 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Tue Apr 14 13:14:36 2015 -0700

----------------------------------------------------------------------
 .../Common/JavaClientLauncher.cs                |  24 ++-
 .../Org.Apache.REEF.Client.csproj               |   3 +
 .../Org.Apache.REEF.Client/YARN/YARNClient.cs   |  74 +++++++
 .../YARN/YARNClientConfiguration.cs             |  34 ++++
 .../YARN/YarnCommandLineEnvironment.cs          | 132 ++++++++++++
 .../HelloREEF.cs                                |   4 +
 lang/java/reef-bridge-client/pom.xml            |   9 +-
 .../bridge/client/YarnJobSubmissionClient.java  | 170 ++++++++++++++++
 .../org/apache/reef/javabridge/LibLoader.java   |  22 +-
 .../yarn/client/YarnJobSubmissionHandler.java   | 180 +++--------------
 .../yarn/client/YarnSubmissionHelper.java       | 202 +++++++++++++++++++
 .../runtime/yarn/client/uploader/JobFolder.java | 116 +++++++++++
 .../yarn/client/uploader/JobUploader.java       |  67 ++++++
 13 files changed, 870 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs b/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
index 2fa83fc..323b490 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
@@ -39,7 +39,8 @@ namespace Org.Apache.REEF.Client.Common
         /// </summary>
         private const string JarFolder = "./";
 
-        private static readonly Logger Logger = Logger.GetLogger(typeof (JavaClientLauncher));
+        private static readonly Logger Logger = Logger.GetLogger(typeof(JavaClientLauncher));
+        private readonly IList<string> _additionalClasspathEntries = new List<string>();
 
         [Inject]
         private JavaClientLauncher()
@@ -152,15 +153,28 @@ namespace Org.Apache.REEF.Client.Common
             var files = Directory.GetFiles(JarFolder)
                 .Where(x => (!string.IsNullOrWhiteSpace(x)))
                 .Where(e => Path.GetFileName(e).ToLower().StartsWith(ClientConstants.ClientJarFilePrefix))
-                .ToArray();
+                .ToList();
 
-            if (null == files || files.Length == 0)
+            if (files.Count == 0)
             {
                 Exceptions.Throw(new ClasspathException(
                     "Unable to assemble classpath. Make sure the REEF JAR is in the current working directory."), Logger);
             }
-            var classPath = string.Join(";", files);
-            return classPath;
+
+            var classpathEntries = new List<string>(_additionalClasspathEntries).Concat(files);
+            return string.Join(";", classpathEntries);
+        }
+
+        /// <summary>
+        /// Add entries to the end of the classpath of the java client.
+        /// </summary>
+        /// <param name="entries"></param>
+        internal void AddToClassPath(IEnumerable<string> entries)
+        {
+            foreach (var entry in entries)
+            {
+                _additionalClasspathEntries.Add(entry);
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index 69c939a..bcb092d 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -55,6 +55,9 @@ under the License.
     <Compile Include="Local\Parameters\LocalRuntimeDirectory.cs" />
     <Compile Include="Local\Parameters\NumberOfEvaluators.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="YARN\YARNClient.cs" />
+    <Compile Include="YARN\YARNClientConfiguration.cs" />
+    <Compile Include="YARN\YarnCommandLineEnvironment.cs" />
   </ItemGroup>
   <ItemGroup>
     <None Include="packages.config" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
new file mode 100644
index 0000000..bcace19
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+    internal sealed class YARNClient : IREEFClient
+    {
+        /// <summary>
+        /// The class name that contains the Java counterpart for this client.
+        /// </summary>
+        private const string JavaClassName = "org.apache.reef.bridge.client.YarnJobSubmissionClient";
+
+        private static readonly Logger Logger = Logger.GetLogger(typeof(YARNClient));
+        private readonly DriverFolderPreparationHelper _driverFolderPreparationHelper;
+        private readonly JavaClientLauncher _javaClientLauncher;
+
+        [Inject]
+        internal YARNClient(JavaClientLauncher javaClientLauncher,
+            DriverFolderPreparationHelper driverFolderPreparationHelper,
+            YarnCommandLineEnvironment yarn)
+        {
+            _javaClientLauncher = javaClientLauncher;
+            _javaClientLauncher.AddToClassPath(yarn.GetYarnClasspathList());
+            _driverFolderPreparationHelper = driverFolderPreparationHelper;
+        }
+
+        public void Submit(JobSubmission jobSubmission)
+        {
+            // Prepare the job submission folder
+            var driverFolderPath = CreateDriverFolder(jobSubmission.JobIdentifier);
+            Logger.Log(Level.Info, "Preparing driver folder in " + driverFolderPath);
+
+            _driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, driverFolderPath);
+
+            // Submit the driver
+            _javaClientLauncher.Launch(JavaClassName, driverFolderPath, jobSubmission.JobIdentifier,
+                jobSubmission.DriverMemory.ToString());
+            Logger.Log(Level.Info, "Submitted the Driver for execution.");
+        }
+
+        /// <summary>
+        /// Creates the temporary directory to hold the job submission.
+        /// </summary>
+        /// <returns>The path to the folder created.</returns>
+        private string CreateDriverFolder(string jobId)
+        {
+            var timestamp = DateTime.Now.ToString("yyyyMMddHHmmssfff");
+            return Path.GetFullPath(Path.Combine(Path.GetTempPath(), string.Join("-", "reef", jobId, timestamp)));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs
new file mode 100644
index 0000000..ad53d0c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+    /// <summary>
+    /// The Configuration for the YARN Client
+    /// </summary>
+    public sealed class YARNClientConfiguration : ConfigurationModuleBuilder
+    {
+        public static ConfigurationModule ConfigurationModule = new YARNClientConfiguration()
+            .BindImplementation(GenericType<IREEFClient>.Class, GenericType<YARNClient>.Class)
+            .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandLineEnvironment.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandLineEnvironment.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandLineEnvironment.cs
new file mode 100644
index 0000000..753c53c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandLineEnvironment.cs
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Text;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+    /// <summary>
+    /// Helper class to interact with the YARN command line.
+    /// </summary>
+    internal sealed class YarnCommandLineEnvironment
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(YarnCommandLineEnvironment));
+
+        [Inject]
+        private YarnCommandLineEnvironment()
+        {
+        }
+
+        /// <summary>
+        /// Returns the full Path to HADOOP_HOME
+        /// </summary>
+        /// <returns>The full Path to HADOOP_HOME</returns>
+        internal string GetHadoopHomePath()
+        {
+            var path = Environment.GetEnvironmentVariable("HADOOP_HOME");
+            if (string.IsNullOrWhiteSpace(path))
+            {
+                var ex = new FileNotFoundException("HADOOP_HOME isn't set.");
+                Exceptions.Throw(ex, Logger);
+                throw ex;
+            }
+
+            var fullPath = Path.GetFullPath(path);
+
+            if (!Directory.Exists(fullPath))
+            {
+                var ex = new FileNotFoundException("HADOOP_HOME points to [" + fullPath + "] which doesn't exist.");
+                Exceptions.Throw(ex, Logger);
+                throw ex;
+            }
+            return fullPath;
+        }
+
+        /// <summary>
+        /// Returns the full Path to the `yarn.cmd` file.
+        /// </summary>
+        /// <returns>The full Path to the `yarn.cmd` file.</returns>
+        internal string GetYarnCommandPath()
+        {
+            var result = Path.Combine(GetHadoopHomePath(), "bin", "yarn.cmd");
+            if (!File.Exists(result))
+            {
+                var ex = new FileNotFoundException("Couldn't find yarn.cmd", result);
+                Exceptions.Throw(ex, Logger);
+                throw ex;
+            }
+            return result;
+        }
+
+        /// <summary>
+        /// Executes `yarn.cmd` with the given parameters.
+        /// </summary>
+        /// <param name="arguments"></param>
+        /// <returns>Whatever was printed to stdout by YARN.</returns>
+        internal string Yarn(params string[] arguments)
+        {
+            var startInfo = new ProcessStartInfo
+            {
+                FileName = GetYarnCommandPath(),
+                Arguments = string.Join(" ", arguments),
+                UseShellExecute = false,
+                RedirectStandardOutput = true,
+                RedirectStandardError = false
+            };
+            var process = Process.Start(startInfo);
+            var output = new StringBuilder();
+            if (process != null)
+            {
+                process.OutputDataReceived += delegate(object sender, DataReceivedEventArgs e)
+                {
+                    if (!string.IsNullOrWhiteSpace(e.Data))
+                    {
+                        output.Append(e.Data);
+                    }
+                };
+                process.BeginOutputReadLine();
+                process.WaitForExit();
+            }
+            else
+            {
+                var ex = new Exception("YARN process didn't start.");
+                Exceptions.Throw(ex, Logger);
+                throw ex;
+            }
+            return output.ToString();
+        }
+
+        /// <summary>
+        /// Returns the class path returned by `yarn classpath`.
+        /// </summary>
+        /// <returns>The class path returned by `yarn classpath`.</returns>
+        internal IList<string> GetYarnClasspathList()
+        {
+            return Yarn("classpath").Split(';').Distinct().ToList();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 5962d38..c0c81ff 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -20,6 +20,7 @@
 using System;
 using Org.Apache.REEF.Client.API;
 using Org.Apache.REEF.Client.Local;
+using Org.Apache.REEF.Client.YARN;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -34,6 +35,7 @@ namespace Org.Apache.REEF.Examples.HelloREEF
     public sealed class HelloREEF
     {
         private const string Local = "local";
+        private const string YARN = "yarn";
         private readonly IREEFClient _reefClient;
 
         [Inject]
@@ -74,6 +76,8 @@ namespace Org.Apache.REEF.Examples.HelloREEF
                     return LocalRuntimeClientConfiguration.ConfigurationModule
                         .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, "2")
                         .Build();
+                case YARN:
+                    return YARNClientConfiguration.ConfigurationModule.Build();
                 default:
                     throw new Exception("Unknown runtime: " + name);
             }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-bridge-client/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/pom.xml b/lang/java/reef-bridge-client/pom.xml
index 7864953..34b64d3 100644
--- a/lang/java/reef-bridge-client/pom.xml
+++ b/lang/java/reef-bridge-client/pom.xml
@@ -81,23 +81,24 @@ under the License.
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-yarn-api</artifactId>
-            <scope>runtime</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-yarn-client</artifactId>
-            <scope>runtime</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-yarn-common</artifactId>
-            <scope>runtime</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
-            <scope>runtime</scope>
+            <scope>compile</scope>
         </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
new file mode 100644
index 0000000..bd4f009
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
+import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
+import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
+import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Java-side of the C# YARN Job Submission API
+ */
+public final class YarnJobSubmissionClient {
+
+  private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName());
+  private final JobUploader uploader;
+  private final ConfigurationSerializer configurationSerializer;
+  private final REEFFileNames fileNames;
+  private final YarnConfiguration yarnConfiguration;
+  private final ClasspathProvider classpath;
+
+  @Inject
+  YarnJobSubmissionClient(final JobUploader uploader,
+                          final YarnConfiguration yarnConfiguration,
+                          final ConfigurationSerializer configurationSerializer,
+                          final REEFFileNames fileNames,
+                          final ClasspathProvider classpath) {
+    this.uploader = uploader;
+    this.configurationSerializer = configurationSerializer;
+    this.fileNames = fileNames;
+    this.yarnConfiguration = yarnConfiguration;
+    this.classpath = classpath;
+  }
+
+  private void addJVMConfiguration(final File driverFolder, final String jobId, final String jobSubmissionFolder) throws IOException {
+    final File driverConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
+    final Configuration driverConfiguration = Configurations.merge(
+        Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
+        YarnDriverConfiguration.CONF
+            .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobSubmissionFolder)
+            .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId)
+            .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)
+            .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+            .build());
+
+    this.configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
+  }
+
+  /**
+   * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR.
+   * @return
+   * @throws IOException
+   */
+  private File makeJar(final File driverFolder) throws IOException {
+    final File jarFile = new File(driverFolder.getParentFile(), driverFolder.getName() + ".jar");
+    final File reefFolder = new File(driverFolder, fileNames.getREEFFolderName());
+    if (!reefFolder.isDirectory()) {
+      throw new FileNotFoundException(reefFolder.getAbsolutePath());
+    }
+
+    new JARFileMaker(jarFile).addChildren(reefFolder).close();
+    return jarFile;
+  }
+
+  /**
+   * @param driverFolder the folder on the local filesystem that contains the driver's working directory to be
+   *                     submitted.
+   * @param jobId        the ID of the job
+   * @param priority     the priority associated with this Driver
+   * @param queue        the queue to submit the driver to
+   * @param driverMemory in MB
+   * @throws IOException
+   * @throws YarnException
+   */
+  private void launch(final File driverFolder,
+                      final String jobId,
+                      final int priority,
+                      final String queue,
+                      final int driverMemory)
+      throws IOException, YarnException {
+    if (!driverFolder.exists()) {
+      throw new IOException("The Driver folder" + driverFolder.getAbsolutePath() + "doesn't exist.");
+    }
+
+    // ------------------------------------------------------------------------
+    // Get an application ID
+    try (final YarnSubmissionHelper submissionHelper =
+             new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath)) {
+
+
+      // ------------------------------------------------------------------------
+      // Prepare the JAR
+      final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId());
+      this.addJVMConfiguration(driverFolder, jobId, jobFolderOnDFS.getPath().toString());
+      final File jarFile = makeJar(driverFolder);
+      LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
+
+
+      // ------------------------------------------------------------------------
+      // Upload the JAR
+      LOG.info("Uploading job submission JAR");
+      final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile);
+      LOG.info("Uploaded job submission JAR");
+
+
+      // ------------------------------------------------------------------------
+      // Submit
+      submissionHelper
+          .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
+          .setApplicationName(jobId)
+          .setDriverMemory(driverMemory)
+          .setPriority(priority)
+          .setQueue(queue)
+          .submit(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE);
+    }
+  }
+
+  public static void main(final String[] args) throws InjectionException, IOException, YarnException {
+    final File driverFolder = new File(args[0]);
+    final String jobId = args[1];
+    final int driverMemory = Integer.valueOf(args[2]);
+
+    // Static for now
+    final int priority = 1;
+    final String queue = "default";
+
+    final Configuration yarnConfiguration = YarnClientConfiguration.CONF.build();
+    final YarnJobSubmissionClient client = Tang.Factory.getTang()
+        .newInjector(yarnConfiguration)
+        .getInstance(YarnJobSubmissionClient.class);
+
+    client.launch(driverFolder, jobId, priority, queue, driverMemory);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
index 1cebfe3..aee7350 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
@@ -80,9 +80,11 @@ public class LibLoader {
     try {
       loadBridgeDLLFromLocal();
     } catch (final Throwable t) {
+      LOG.log(Level.INFO, "Unable to load bridge DLL from local folder. Attempting global folder next.", t);
       try {
         loadBridgeDLLFromGlobal();
       } catch (final Throwable t2) {
+        LOG.log(Level.WARNING, "Unable to load bridge DLL from global folder. Attempting jar next.", t2);
         loadBridgeDLLFromJAR();
       }
     }
@@ -92,7 +94,7 @@ public class LibLoader {
   /**
    * Attempts to load the bridge DLL from the global folder.
    */
-  private void loadBridgeDLLFromGlobal() {
+  private void loadBridgeDLLFromGlobal() throws FileNotFoundException {
     LOG.log(Level.INFO, "Attempting to load the bridge DLL from the global folder.");
     loadBridgeDLLFromFile(reefFileNames.getBridgeDLLInGlobalFolderFile());
   }
@@ -100,7 +102,7 @@ public class LibLoader {
   /**
    * Attempts to load the bridge DLL from the local folder.
    */
-  private void loadBridgeDLLFromLocal() {
+  private void loadBridgeDLLFromLocal() throws FileNotFoundException {
     LOG.log(Level.INFO, "Attempting to load the bridge DLL from the local folder.");
     loadBridgeDLLFromFile(reefFileNames.getBridgeDLLInLocalFolderFile());
   }
@@ -110,10 +112,18 @@ public class LibLoader {
    *
    * @param bridgeDLLFile
    */
-  private static void loadBridgeDLLFromFile(final File bridgeDLLFile) {
-    LOG.log(Level.INFO, "Attempting to load the bridge DLL from {0}", bridgeDLLFile);
-    System.load(bridgeDLLFile.getAbsolutePath());
-    LOG.log(Level.INFO, "Successfully loaded the bridge DLL from {0}", bridgeDLLFile);
+  private static void loadBridgeDLLFromFile(final File bridgeDLLFile) throws FileNotFoundException {
+    if (!bridgeDLLFile.exists()) {
+      throw new FileNotFoundException("Unable to load Bridge DLL from " + bridgeDLLFile.getAbsolutePath() + " because the file can't be found.");
+    }
+    try {
+      LOG.log(Level.INFO, "Attempting to load the bridge DLL from {0}", bridgeDLLFile);
+      System.load(bridgeDLLFile.getAbsolutePath());
+      LOG.log(Level.INFO, "Successfully loaded the bridge DLL from {0}", bridgeDLLFile);
+    } catch (final Throwable t) {
+      LOG.log(Level.WARNING, "Unable to load " + bridgeDLLFile.getAbsolutePath(), t);
+      throw t;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index fd2bb69..eb6b802 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -18,20 +18,10 @@
  */
 package org.apache.reef.runtime.yarn.client;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
@@ -40,10 +30,10 @@ import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.JobJarMaker;
 import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
 import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
+import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
 import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
-import org.apache.reef.runtime.yarn.util.YarnTypes;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Configurations;
 import org.apache.reef.tang.annotations.Parameter;
@@ -54,9 +44,6 @@ import org.apache.reef.tang.util.ReflectionUtilities;
 import javax.inject.Inject;
 import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -67,40 +54,34 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
   private static final Logger LOG = Logger.getLogger(YarnJobSubmissionHandler.class.getName());
 
   private final YarnConfiguration yarnConfiguration;
-  private final YarnClient yarnClient;
   private final JobJarMaker jobJarMaker;
-  private final REEFFileNames filenames;
+  private final REEFFileNames fileNames;
   private final ClasspathProvider classpath;
-  private final FileSystem fileSystem;
   private final ConfigurationSerializer configurationSerializer;
+  private final JobUploader uploader;
   private final double jvmSlack;
 
   @Inject
   YarnJobSubmissionHandler(
       final YarnConfiguration yarnConfiguration,
       final JobJarMaker jobJarMaker,
-      final REEFFileNames filenames,
+      final REEFFileNames fileNames,
       final ClasspathProvider classpath,
       final ConfigurationSerializer configurationSerializer,
+      final JobUploader uploader,
       final @Parameter(JVMHeapSlack.class) double jvmSlack) throws IOException {
 
     this.yarnConfiguration = yarnConfiguration;
     this.jobJarMaker = jobJarMaker;
-    this.filenames = filenames;
+    this.fileNames = fileNames;
     this.classpath = classpath;
     this.configurationSerializer = configurationSerializer;
+    this.uploader = uploader;
     this.jvmSlack = jvmSlack;
-
-    this.fileSystem = FileSystem.get(yarnConfiguration);
-
-    this.yarnClient = YarnClient.createYarnClient();
-    this.yarnClient.init(this.yarnConfiguration);
-    this.yarnClient.start();
   }
 
   @Override
   public void close() {
-    this.yarnClient.stop();
   }
 
   @Override
@@ -108,86 +89,24 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
 
     LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionProto.getIdentifier());
 
-    try {
-
-      LOG.log(Level.FINE, "Requesting Application ID from YARN.");
-
-      final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication();
-      final GetNewApplicationResponse applicationResponse = yarnClientApplication.getNewApplicationResponse();
-
-      final ApplicationSubmissionContext applicationSubmissionContext =
-          yarnClientApplication.getApplicationSubmissionContext();
-
-      final ApplicationId applicationId = applicationSubmissionContext.getApplicationId();
-
-      LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
-
-      // set the application name
-      applicationSubmissionContext.setApplicationName(
-          "reef-job-" + jobSubmissionProto.getIdentifier());
+    try (final YarnSubmissionHelper submissionHelper =
+             new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, this.classpath)) {
 
       LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
-
-      final Path submissionFolder = new Path(
-          "/tmp/" + this.filenames.getJobFolderPrefix() + applicationId.getId() + "/");
-
-      final Configuration driverConfiguration =
-          makeDriverConfiguration(jobSubmissionProto, submissionFolder);
-
-      final File jobSubmissionFile =
-          this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration);
-
-      final Path uploadedJobJarPath = this.uploadToJobFolder(jobSubmissionFile, submissionFolder);
-
-      final Map<String, LocalResource> resources = new HashMap<>(1);
-      resources.put(this.filenames.getREEFFolderName(),
-          this.makeLocalResourceForJarFile(uploadedJobJarPath));
-
-      // SET MEMORY RESOURCE
-      final int amMemory = getMemory(
-          jobSubmissionProto, applicationResponse.getMaximumResourceCapability().getMemory());
-      applicationSubmissionContext.setResource(Resource.newInstance(amMemory, 1));
-
-      // SET EXEC COMMAND
-      final List<String> launchCommand = new JavaLaunchCommandBuilder()
-          .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
-          .setLaunchID(jobSubmissionProto.getIdentifier())
-          .setConfigurationFileName(this.filenames.getDriverConfigurationPath())
-          .setClassPath(this.classpath.getDriverClasspath())
-          .setMemory(amMemory)
-          .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName())
-          .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName())
-          .build();
-
-      applicationSubmissionContext.setAMContainerSpec(
-          YarnTypes.getContainerLaunchContext(launchCommand, resources));
-
-      applicationSubmissionContext.setPriority(getPriority(jobSubmissionProto));
-
-      // Set the queue to which this application is to be submitted in the RM
-      applicationSubmissionContext.setQueue(getQueue(jobSubmissionProto, "default"));
-      LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", applicationId);
-
-      if (LOG.isLoggable(Level.FINEST)) {
-        LOG.log(Level.FINEST, "REEF app command: {0}", StringUtils.join(launchCommand, ' '));
-      }
-
-      // TODO: this is currently being developed on a hacked 2.4.0 bits, should be 2.4.1
-      final String minVersionKeepContainerOptionAvailable = "2.4.0";
-
-      // when supported, set KeepContainersAcrossApplicationAttempts to be true
-      // so that when driver (AM) crashes, evaluators will still be running and we can recover later.
-      if (YarnTypes.isAtOrAfterVersion(minVersionKeepContainerOptionAvailable)) {
-        LOG.log(
-            Level.FINE,
-            "Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported, will set it to true.",
-            minVersionKeepContainerOptionAvailable);
-
-        applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
-      }
-
-      this.yarnClient.submitApplication(applicationSubmissionContext);
-
+      final JobFolder jobFolderOnDfs = this.uploader.createJobFolder(submissionHelper.getApplicationId());
+      final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionProto, jobFolderOnDfs.getPath());
+      final File jobSubmissionFile = this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration);
+      final LocalResource driverJarOnDfs = jobFolderOnDfs.uploadAsLocalResource(jobSubmissionFile);
+
+      submissionHelper
+          .addLocalResource(this.fileNames.getREEFFolderName(), driverJarOnDfs)
+          .setApplicationName(jobSubmissionProto.getIdentifier())
+          .setDriverMemory(jobSubmissionProto.getDriverMemory())
+          .setPriority(getPriority(jobSubmissionProto))
+          .setQueue(getQueue(jobSubmissionProto, "default"))
+          .submit(jobSubmissionProto.getRemoteId());
+
+      LOG.log(Level.FINEST, "Submitted job with ID [{0}]", jobSubmissionProto.getIdentifier());
     } catch (final YarnException | IOException e) {
       throw new RuntimeException("Unable to submit Driver to YARN.", e);
     }
@@ -199,7 +118,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
   private Configuration makeDriverConfiguration(
       final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
       final Path jobFolderPath) throws IOException {
-    Configuration config = this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration());
+    final Configuration config = this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration());
     final String userBoundJobSubmissionDirectory = config.getNamedParameter((NamedParameterNode<?>) config.getClassHierarchy().getNode(ReflectionUtilities.getFullName(DriverJobSubmissionDirectory.class)));
     LOG.log(Level.FINE, "user bound job submission Directory: " + userBoundJobSubmissionDirectory);
     final String finalJobFolderPath =
@@ -215,17 +134,8 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
         this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()));
   }
 
-  private final Path uploadToJobFolder(final File file, final Path jobFolder) throws IOException {
-    final Path source = new Path(file.getAbsolutePath());
-    final Path destination = new Path(jobFolder, file.getName());
-    LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination});
-    this.fileSystem.copyFromLocalFile(false, true, source, destination);
-    return destination;
-  }
-
-  private Priority getPriority(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
-    return Priority.newInstance(
-        jobSubmissionProto.hasPriority() ? jobSubmissionProto.getPriority() : 0);
+  private static int getPriority(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+    return jobSubmissionProto.hasPriority() ? jobSubmissionProto.getPriority() : 0;
   }
 
   /**
@@ -239,39 +149,5 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
         jobSubmissionProto.getQueue() : defaultQueue;
   }
 
-  /**
-   * Extract the desired driver memory from jobSubmissionProto.
-   * <p/>
-   * returns maxMemory if that desired amount is more than maxMemory
-   */
-  private int getMemory(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
-                        final int maxMemory) {
-    final int amMemory;
-    final int requestedMemory = jobSubmissionProto.getDriverMemory();
-    if (requestedMemory <= maxMemory) {
-      amMemory = requestedMemory;
-    } else {
-      LOG.log(Level.WARNING,
-          "Requested {0}MB of memory for the driver. " +
-              "The max on this YARN installation is {1}. " +
-              "Using {1} as the memory for the driver.",
-          new Object[]{requestedMemory, maxMemory});
-      amMemory = maxMemory;
-    }
-    return amMemory;
-  }
 
-  /**
-   * Creates a LocalResource instance for the JAR file referenced by the given Path
-   */
-  private LocalResource makeLocalResourceForJarFile(final Path path) throws IOException {
-    final LocalResource localResource = Records.newRecord(LocalResource.class);
-    final FileStatus status = FileContext.getFileContext(fileSystem.getUri()).getFileStatus(path);
-    localResource.setType(LocalResourceType.ARCHIVE);
-    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
-    localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
-    localResource.setTimestamp(status.getModificationTime());
-    localResource.setSize(status.getLen());
-    return localResource;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
new file mode 100644
index 0000000..7d171cd
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.yarn.util.YarnTypes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper code that wraps the YARN Client API for our purposes.
+ */
+public final class YarnSubmissionHelper implements Closeable{
+  private static final Logger LOG = Logger.getLogger(YarnSubmissionHelper.class.getName());
+
+  private final YarnClient yarnClient;
+  private final YarnClientApplication yarnClientApplication;
+  private final GetNewApplicationResponse applicationResponse;
+  private final ApplicationSubmissionContext applicationSubmissionContext;
+  private final ApplicationId applicationId;
+  private final Map<String, LocalResource> resources = new HashMap<>();
+  private final REEFFileNames fileNames;
+  private final ClasspathProvider classpath;
+
+
+  public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
+                              final REEFFileNames fileNames,
+                              final ClasspathProvider classpath) throws IOException, YarnException {
+    this.fileNames = fileNames;
+    this.classpath = classpath;
+    {
+      LOG.log(Level.FINE, "Initializing YARN Client");
+      this.yarnClient = YarnClient.createYarnClient();
+      this.yarnClient.init(yarnConfiguration);
+      this.yarnClient.start();
+      LOG.log(Level.FINE, "Initialized YARN Client");
+    }
+    {
+      LOG.log(Level.FINE, "Requesting Application ID from YARN.");
+      this.yarnClientApplication = this.yarnClient.createApplication();
+      this.applicationResponse = yarnClientApplication.getNewApplicationResponse();
+      this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext();
+      this.applicationId = applicationSubmissionContext.getApplicationId();
+      LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
+    }
+  }
+
+  /**
+   *
+   * @return the application ID assigned by YARN.
+   */
+  public int getApplicationId() {
+    return this.applicationId.getId();
+  }
+
+  /**
+   * Set the name of the application to be submitted.
+   * @param applicationName
+   * @return
+   */
+  public YarnSubmissionHelper setApplicationName(final String applicationName) {
+    applicationSubmissionContext.setApplicationName(applicationName);
+    return this;
+  }
+
+  /**
+   * Set the amount of memory to be allocated to the Driver.
+   * @param megabytes
+   * @return
+   */
+  public YarnSubmissionHelper setDriverMemory(final int megabytes) {
+    applicationSubmissionContext.setResource(Resource.newInstance(getMemory(megabytes), 1));
+    return this;
+  }
+
+  /**
+   * Add a file to be localized on the driver.
+   * @param resourceName
+   * @param resource
+   * @return
+   */
+  public YarnSubmissionHelper addLocalResource(final String resourceName, final LocalResource resource) {
+    resources.put(resourceName, resource);
+    return this;
+  }
+
+  /**
+   * Set the priority of the job.
+   * @param priority
+   * @return
+   */
+  public YarnSubmissionHelper setPriority(final int priority) {
+    this.applicationSubmissionContext.setPriority(Priority.newInstance(priority));
+    return this;
+  }
+
+  /**
+   * Assign this job submission to a queue.
+   * @param queueName
+   * @return
+   */
+  public YarnSubmissionHelper setQueue(final String queueName) {
+    this.applicationSubmissionContext.setQueue(queueName);
+    return this;
+  }
+
+  public void submit(final String clientRemoteIdentifier) throws IOException, YarnException {
+    // SET EXEC COMMAND
+    final List<String> launchCommand = new JavaLaunchCommandBuilder()
+        .setErrorHandlerRID(clientRemoteIdentifier)
+        .setLaunchID(this.applicationSubmissionContext.getApplicationName())
+        .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
+        .setClassPath(this.classpath.getDriverClasspath())
+        .setMemory(this.applicationSubmissionContext.getResource().getMemory())
+        .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName())
+        .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStderrFileName())
+        .build();
+
+    this.applicationSubmissionContext.setAMContainerSpec(YarnTypes.getContainerLaunchContext(launchCommand, this.resources));
+
+    LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", this.applicationId);
+
+    if (LOG.isLoggable(Level.FINEST)) {
+      LOG.log(Level.FINEST, "REEF app command: {0}", StringUtils.join(launchCommand, ' '));
+    }
+
+    // TODO: this is currently being developed on a hacked 2.4.0 bits, should be 2.4.1
+    final String minVersionKeepContainerOptionAvailable = "2.4.0";
+
+    // when supported, set KeepContainersAcrossApplicationAttempts to be true
+    // so that when driver (AM) crashes, evaluators will still be running and we can recover later.
+    if (YarnTypes.isAtOrAfterVersion(minVersionKeepContainerOptionAvailable)) {
+      LOG.log(
+          Level.FINE,
+          "Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported, will set it to true.",
+          minVersionKeepContainerOptionAvailable);
+
+      applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
+    }
+
+    this.yarnClient.submitApplication(applicationSubmissionContext);
+  }
+
+  /**
+   * Extract the desired driver memory from jobSubmissionProto.
+   * <p/>
+   * returns maxMemory if that desired amount is more than maxMemory
+   */
+  private int getMemory(final int requestedMemory) {
+    final int maxMemory = applicationResponse.getMaximumResourceCapability().getMemory();
+    final int amMemory;
+
+    if (requestedMemory <= maxMemory) {
+      amMemory = requestedMemory;
+    } else {
+      LOG.log(Level.WARNING,
+          "Requested {0}MB of memory for the driver. " +
+              "The max on this YARN installation is {1}. " +
+              "Using {1} as the memory for the driver.",
+          new Object[]{requestedMemory, maxMemory});
+      amMemory = maxMemory;
+    }
+    return amMemory;
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.yarnClient.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java
new file mode 100644
index 0000000..8ea1df1
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.uploader;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper class that represents a folder on the destination filesystem.
+ */
+public final class JobFolder {
+
+  private static final Logger LOG = Logger.getLogger(JobFolder.class.getName());
+  private final FileSystem fileSystem;
+  private final Path path;
+
+  /**
+   * This constructor is only called by JobUploader.
+   *
+   * @param fileSystem
+   * @param path
+   * @throws IOException when the given path can't be created on the given FileSystem
+   */
+  JobFolder(final FileSystem fileSystem, final Path path) throws IOException {
+    this.fileSystem = fileSystem;
+    this.path = path;
+    this.fileSystem.mkdirs(this.path);
+  }
+
+  /**
+   * Uploads the given file to the DFS
+   *
+   * @param localFile
+   * @return the Path representing the file on the DFS.
+   * @throws IOException
+   */
+  public Path upload(final File localFile) throws IOException {
+    if (!localFile.exists()) {
+      throw new FileNotFoundException(localFile.getAbsolutePath());
+    }
+
+    final Path source = new Path(localFile.getAbsolutePath());
+    final Path destination = new Path(this.path, localFile.getName());
+    try {
+      this.fileSystem.copyFromLocalFile(source, destination);
+    } catch (final IOException e) {
+      LOG.log(Level.SEVERE, "Unable to upload {0} to {1}", new Object[]{source, destination});
+      throw e;
+    }
+    LOG.log(Level.FINE, "Uploaded {0} to {1}", new Object[]{source, destination});
+
+    return destination;
+  }
+
+  /**
+   * Shortcut to first upload the file and then form a LocalResource for the YARN submission.
+   *
+   * @param localFile
+   * @return
+   * @throws IOException
+   */
+  public LocalResource uploadAsLocalResource(final File localFile) throws IOException {
+    final Path p = upload(localFile);
+    return getLocalResourceForPath(p);
+  }
+
+  /**
+   * Creates a LocalResource instance for the JAR file referenced by the given Path
+   */
+  public LocalResource getLocalResourceForPath(final Path path) throws IOException {
+    final LocalResource localResource = Records.newRecord(LocalResource.class);
+    final FileStatus status = FileContext.getFileContext(fileSystem.getUri()).getFileStatus(path);
+    localResource.setType(LocalResourceType.ARCHIVE);
+    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+    localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
+    localResource.setTimestamp(status.getModificationTime());
+    localResource.setSize(status.getLen());
+    return localResource;
+  }
+
+  /**
+   * @return the Path on the DFS represented by this object.
+   */
+  public Path getPath() {
+    return this.path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
new file mode 100644
index 0000000..29badbd
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.uploader;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+/**
+ * Helper class to upload the driver files to HDFS.
+ */
+public final class JobUploader {
+
+  private final FileSystem fileSystem;
+  private final REEFFileNames fileNames;
+
+  @Inject
+  JobUploader(final YarnConfiguration yarnConfiguration,
+              final REEFFileNames fileNames) throws IOException {
+    this.fileNames = fileNames;
+    this.fileSystem = FileSystem.get(yarnConfiguration);
+  }
+
+  /**
+   * Creates the Job folder on the DFS.
+   *
+   * @param applicationId
+   * @return a reference to the JobFolder that can be used to upload files to it.
+   * @throws IOException
+   */
+  public JobFolder createJobFolder(final String applicationId) throws IOException {
+    // TODO: This really should be configurable, but wasn't in the code I moved as part of [REEF-228]
+    final Path jobFolderPath = new Path("/tmp/" + this.fileNames.getJobFolderPrefix() + applicationId + "/");
+    return new JobFolder(this.fileSystem, jobFolderPath);
+  }
+
+  /**
+   * Convenience override for int ids.
+   *
+   * @param applicationId
+   * @return
+   * @throws IOException
+   */
+  public JobFolder createJobFolder(final int applicationId) throws IOException {
+    return this.createJobFolder(Integer.toString(applicationId));
+  }
+}