You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/04/14 22:16:45 UTC
incubator-reef git commit: [REEF-226] A REEF.NET client for YARN
Repository: incubator-reef
Updated Branches:
refs/heads/master c85d45a27 -> 1aae0ea41
[REEF-226] A REEF.NET client for YARN
* Refactored YarnJobSubmissionHandler to make the code reusable for
the command line launcher for .NET Drivers
* Added YarnJobsubmissionClient to be launched by the .NET API
* Updated HelloREEF to run on YARN as well. Just run
`Org.Apache.REEF.Examples.HelloREEF.exe yarn` to run it.
JIRA:
[REEF-226](https://issues.apache.org/jira/browse/REEF-226)
Pull Request:
This closes #144
Author: Markus Weimer <we...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/1aae0ea4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/1aae0ea4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/1aae0ea4
Branch: refs/heads/master
Commit: 1aae0ea41ab3a9d770f6794625435eecf8c7b2e7
Parents: c85d45a
Author: Markus Weimer <we...@apache.org>
Authored: Thu Apr 9 16:10:04 2015 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Tue Apr 14 13:14:36 2015 -0700
----------------------------------------------------------------------
.../Common/JavaClientLauncher.cs | 24 ++-
.../Org.Apache.REEF.Client.csproj | 3 +
.../Org.Apache.REEF.Client/YARN/YARNClient.cs | 74 +++++++
.../YARN/YARNClientConfiguration.cs | 34 ++++
.../YARN/YarnCommandLineEnvironment.cs | 132 ++++++++++++
.../HelloREEF.cs | 4 +
lang/java/reef-bridge-client/pom.xml | 9 +-
.../bridge/client/YarnJobSubmissionClient.java | 170 ++++++++++++++++
.../org/apache/reef/javabridge/LibLoader.java | 22 +-
.../yarn/client/YarnJobSubmissionHandler.java | 180 +++--------------
.../yarn/client/YarnSubmissionHelper.java | 202 +++++++++++++++++++
.../runtime/yarn/client/uploader/JobFolder.java | 116 +++++++++++
.../yarn/client/uploader/JobUploader.java | 67 ++++++
13 files changed, 870 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs b/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
index 2fa83fc..323b490 100644
--- a/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Common/JavaClientLauncher.cs
@@ -39,7 +39,8 @@ namespace Org.Apache.REEF.Client.Common
/// </summary>
private const string JarFolder = "./";
- private static readonly Logger Logger = Logger.GetLogger(typeof (JavaClientLauncher));
+ private static readonly Logger Logger = Logger.GetLogger(typeof(JavaClientLauncher));
+ private readonly IList<string> _additionalClasspathEntries = new List<string>();
[Inject]
private JavaClientLauncher()
@@ -152,15 +153,28 @@ namespace Org.Apache.REEF.Client.Common
var files = Directory.GetFiles(JarFolder)
.Where(x => (!string.IsNullOrWhiteSpace(x)))
.Where(e => Path.GetFileName(e).ToLower().StartsWith(ClientConstants.ClientJarFilePrefix))
- .ToArray();
+ .ToList();
- if (null == files || files.Length == 0)
+ if (files.Count == 0)
{
Exceptions.Throw(new ClasspathException(
"Unable to assemble classpath. Make sure the REEF JAR is in the current working directory."), Logger);
}
- var classPath = string.Join(";", files);
- return classPath;
+
+ var classpathEntries = new List<string>(_additionalClasspathEntries).Concat(files);
+ return string.Join(";", classpathEntries);
+ }
+
+ /// <summary>
+ /// Add entries to the end of the classpath of the java client.
+ /// </summary>
+ /// <param name="entries"></param>
+ internal void AddToClassPath(IEnumerable<string> entries)
+ {
+ foreach (var entry in entries)
+ {
+ _additionalClasspathEntries.Add(entry);
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index 69c939a..bcb092d 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -55,6 +55,9 @@ under the License.
<Compile Include="Local\Parameters\LocalRuntimeDirectory.cs" />
<Compile Include="Local\Parameters\NumberOfEvaluators.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="YARN\YARNClient.cs" />
+ <Compile Include="YARN\YARNClientConfiguration.cs" />
+ <Compile Include="YARN\YarnCommandLineEnvironment.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
new file mode 100644
index 0000000..bcace19
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.IO;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+ internal sealed class YARNClient : IREEFClient
+ {
+ /// <summary>
+ /// The class name that contains the Java counterpart for this client.
+ /// </summary>
+ private const string JavaClassName = "org.apache.reef.bridge.client.YarnJobSubmissionClient";
+
+ private static readonly Logger Logger = Logger.GetLogger(typeof(YARNClient));
+ private readonly DriverFolderPreparationHelper _driverFolderPreparationHelper;
+ private readonly JavaClientLauncher _javaClientLauncher;
+
+ [Inject]
+ internal YARNClient(JavaClientLauncher javaClientLauncher,
+ DriverFolderPreparationHelper driverFolderPreparationHelper,
+ YarnCommandLineEnvironment yarn)
+ {
+ _javaClientLauncher = javaClientLauncher;
+ _javaClientLauncher.AddToClassPath(yarn.GetYarnClasspathList());
+ _driverFolderPreparationHelper = driverFolderPreparationHelper;
+ }
+
+ public void Submit(JobSubmission jobSubmission)
+ {
+ // Prepare the job submission folder
+ var driverFolderPath = CreateDriverFolder(jobSubmission.JobIdentifier);
+ Logger.Log(Level.Info, "Preparing driver folder in " + driverFolderPath);
+
+ _driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, driverFolderPath);
+
+ // Submit the driver
+ _javaClientLauncher.Launch(JavaClassName, driverFolderPath, jobSubmission.JobIdentifier,
+ jobSubmission.DriverMemory.ToString());
+ Logger.Log(Level.Info, "Submitted the Driver for execution.");
+ }
+
+ /// <summary>
+ /// Creates the temporary directory to hold the job submission.
+ /// </summary>
+ /// <returns>The path to the folder created.</returns>
+ private string CreateDriverFolder(string jobId)
+ {
+ var timestamp = DateTime.Now.ToString("yyyyMMddHHmmssfff");
+ return Path.GetFullPath(Path.Combine(Path.GetTempPath(), string.Join("-", "reef", jobId, timestamp)));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs
new file mode 100644
index 0000000..ad53d0c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+ /// <summary>
+ /// The Configuration for the YARN Client
+ /// </summary>
+ public sealed class YARNClientConfiguration : ConfigurationModuleBuilder
+ {
+ public static ConfigurationModule ConfigurationModule = new YARNClientConfiguration()
+ .BindImplementation(GenericType<IREEFClient>.Class, GenericType<YARNClient>.Class)
+ .Build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandLineEnvironment.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandLineEnvironment.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandLineEnvironment.cs
new file mode 100644
index 0000000..753c53c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnCommandLineEnvironment.cs
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Text;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+ /// <summary>
+ /// Helper class to interact with the YARN command line.
+ /// </summary>
+ internal sealed class YarnCommandLineEnvironment
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(YarnCommandLineEnvironment));
+
+ [Inject]
+ private YarnCommandLineEnvironment()
+ {
+ }
+
+ /// <summary>
+ /// Returns the full Path to HADOOP_HOME
+ /// </summary>
+ /// <returns>The full Path to HADOOP_HOME</returns>
+ internal string GetHadoopHomePath()
+ {
+ var path = Environment.GetEnvironmentVariable("HADOOP_HOME");
+ if (string.IsNullOrWhiteSpace(path))
+ {
+ var ex = new FileNotFoundException("HADOOP_HOME isn't set.");
+ Exceptions.Throw(ex, Logger);
+ throw ex;
+ }
+
+ var fullPath = Path.GetFullPath(path);
+
+ if (!Directory.Exists(fullPath))
+ {
+ var ex = new FileNotFoundException("HADOOP_HOME points to [" + fullPath + "] which doesn't exist.");
+ Exceptions.Throw(ex, Logger);
+ throw ex;
+ }
+ return fullPath;
+ }
+
+ /// <summary>
+ /// Returns the full Path to the `yarn.cmd` file.
+ /// </summary>
+ /// <returns>The full Path to the `yarn.cmd` file.</returns>
+ internal string GetYarnCommandPath()
+ {
+ var result = Path.Combine(GetHadoopHomePath(), "bin", "yarn.cmd");
+ if (!File.Exists(result))
+ {
+ var ex = new FileNotFoundException("Couldn't find yarn.cmd", result);
+ Exceptions.Throw(ex, Logger);
+ throw ex;
+ }
+ return result;
+ }
+
+ /// <summary>
+ /// Executes `yarn.cmd` with the given parameters.
+ /// </summary>
+ /// <param name="arguments"></param>
+ /// <returns>Whatever was printed to stdout by YARN.</returns>
+ internal string Yarn(params string[] arguments)
+ {
+ var startInfo = new ProcessStartInfo
+ {
+ FileName = GetYarnCommandPath(),
+ Arguments = string.Join(" ", arguments),
+ UseShellExecute = false,
+ RedirectStandardOutput = true,
+ RedirectStandardError = false
+ };
+ var process = Process.Start(startInfo);
+ var output = new StringBuilder();
+ if (process != null)
+ {
+ process.OutputDataReceived += delegate(object sender, DataReceivedEventArgs e)
+ {
+ if (!string.IsNullOrWhiteSpace(e.Data))
+ {
+ output.Append(e.Data);
+ }
+ };
+ process.BeginOutputReadLine();
+ process.WaitForExit();
+ }
+ else
+ {
+ var ex = new Exception("YARN process didn't start.");
+ Exceptions.Throw(ex, Logger);
+ throw ex;
+ }
+ return output.ToString();
+ }
+
+ /// <summary>
+ /// Returns the class path returned by `yarn classpath`.
+ /// </summary>
+ /// <returns>The class path returned by `yarn classpath`.</returns>
+ internal IList<string> GetYarnClasspathList()
+ {
+ return Yarn("classpath").Split(';').Distinct().ToList();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 5962d38..c0c81ff 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -20,6 +20,7 @@
using System;
using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.Client.Local;
+using Org.Apache.REEF.Client.YARN;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -34,6 +35,7 @@ namespace Org.Apache.REEF.Examples.HelloREEF
public sealed class HelloREEF
{
private const string Local = "local";
+ private const string YARN = "yarn";
private readonly IREEFClient _reefClient;
[Inject]
@@ -74,6 +76,8 @@ namespace Org.Apache.REEF.Examples.HelloREEF
return LocalRuntimeClientConfiguration.ConfigurationModule
.Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, "2")
.Build();
+ case YARN:
+ return YARNClientConfiguration.ConfigurationModule.Build();
default:
throw new Exception("Unknown runtime: " + name);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-bridge-client/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/pom.xml b/lang/java/reef-bridge-client/pom.xml
index 7864953..34b64d3 100644
--- a/lang/java/reef-bridge-client/pom.xml
+++ b/lang/java/reef-bridge-client/pom.xml
@@ -81,23 +81,24 @@ under the License.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
- <scope>runtime</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
- <scope>runtime</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
- <scope>runtime</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <scope>runtime</scope>
+ <scope>compile</scope>
</dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
new file mode 100644
index 0000000..bd4f009
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper;
+import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
+import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
+import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Java-side of the C# YARN Job Submission API
+ */
+public final class YarnJobSubmissionClient {
+
+ private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName());
+ private final JobUploader uploader;
+ private final ConfigurationSerializer configurationSerializer;
+ private final REEFFileNames fileNames;
+ private final YarnConfiguration yarnConfiguration;
+ private final ClasspathProvider classpath;
+
+ @Inject
+ YarnJobSubmissionClient(final JobUploader uploader,
+ final YarnConfiguration yarnConfiguration,
+ final ConfigurationSerializer configurationSerializer,
+ final REEFFileNames fileNames,
+ final ClasspathProvider classpath) {
+ this.uploader = uploader;
+ this.configurationSerializer = configurationSerializer;
+ this.fileNames = fileNames;
+ this.yarnConfiguration = yarnConfiguration;
+ this.classpath = classpath;
+ }
+
+ private void addJVMConfiguration(final File driverFolder, final String jobId, final String jobSubmissionFolder) throws IOException {
+ final File driverConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath());
+ final Configuration driverConfiguration = Configurations.merge(
+ Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER,
+ YarnDriverConfiguration.CONF
+ .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobSubmissionFolder)
+ .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId)
+ .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)
+ .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
+ .build());
+
+ this.configurationSerializer.toFile(driverConfiguration, driverConfigurationFile);
+ }
+
+ /**
+ * @param driverFolder the folder containing the `reef` folder. Only that `reef` folder will be in the JAR.
+ * @return
+ * @throws IOException
+ */
+ private File makeJar(final File driverFolder) throws IOException {
+ final File jarFile = new File(driverFolder.getParentFile(), driverFolder.getName() + ".jar");
+ final File reefFolder = new File(driverFolder, fileNames.getREEFFolderName());
+ if (!reefFolder.isDirectory()) {
+ throw new FileNotFoundException(reefFolder.getAbsolutePath());
+ }
+
+ new JARFileMaker(jarFile).addChildren(reefFolder).close();
+ return jarFile;
+ }
+
+ /**
+ * @param driverFolder the folder on the local filesystem that contains the driver's working directory to be
+ * submitted.
+ * @param jobId the ID of the job
+ * @param priority the priority associated with this Driver
+ * @param queue the queue to submit the driver to
+ * @param driverMemory in MB
+ * @throws IOException
+ * @throws YarnException
+ */
+ private void launch(final File driverFolder,
+ final String jobId,
+ final int priority,
+ final String queue,
+ final int driverMemory)
+ throws IOException, YarnException {
+ if (!driverFolder.exists()) {
+ throw new IOException("The Driver folder" + driverFolder.getAbsolutePath() + "doesn't exist.");
+ }
+
+ // ------------------------------------------------------------------------
+ // Get an application ID
+ try (final YarnSubmissionHelper submissionHelper =
+ new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath)) {
+
+
+ // ------------------------------------------------------------------------
+ // Prepare the JAR
+ final JobFolder jobFolderOnDFS = this.uploader.createJobFolder(submissionHelper.getApplicationId());
+ this.addJVMConfiguration(driverFolder, jobId, jobFolderOnDFS.getPath().toString());
+ final File jarFile = makeJar(driverFolder);
+ LOG.log(Level.INFO, "Created job submission jar file: {0}", jarFile);
+
+
+ // ------------------------------------------------------------------------
+ // Upload the JAR
+ LOG.info("Uploading job submission JAR");
+ final LocalResource jarFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(jarFile);
+ LOG.info("Uploaded job submission JAR");
+
+
+ // ------------------------------------------------------------------------
+ // Submit
+ submissionHelper
+ .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
+ .setApplicationName(jobId)
+ .setDriverMemory(driverMemory)
+ .setPriority(priority)
+ .setQueue(queue)
+ .submit(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE);
+ }
+ }
+
+ public static void main(final String[] args) throws InjectionException, IOException, YarnException {
+ final File driverFolder = new File(args[0]);
+ final String jobId = args[1];
+ final int driverMemory = Integer.valueOf(args[2]);
+
+ // Static for now
+ final int priority = 1;
+ final String queue = "default";
+
+ final Configuration yarnConfiguration = YarnClientConfiguration.CONF.build();
+ final YarnJobSubmissionClient client = Tang.Factory.getTang()
+ .newInjector(yarnConfiguration)
+ .getInstance(YarnJobSubmissionClient.class);
+
+ client.launch(driverFolder, jobId, priority, queue, driverMemory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
index 1cebfe3..aee7350 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
@@ -80,9 +80,11 @@ public class LibLoader {
try {
loadBridgeDLLFromLocal();
} catch (final Throwable t) {
+ LOG.log(Level.INFO, "Unable to load bridge DLL from local folder. Attempting global folder next.", t);
try {
loadBridgeDLLFromGlobal();
} catch (final Throwable t2) {
+ LOG.log(Level.WARNING, "Unable to load bridge DLL from global folder. Attempting jar next.", t2);
loadBridgeDLLFromJAR();
}
}
@@ -92,7 +94,7 @@ public class LibLoader {
/**
* Attempts to load the bridge DLL from the global folder.
*/
- private void loadBridgeDLLFromGlobal() {
+ private void loadBridgeDLLFromGlobal() throws FileNotFoundException {
LOG.log(Level.INFO, "Attempting to load the bridge DLL from the global folder.");
loadBridgeDLLFromFile(reefFileNames.getBridgeDLLInGlobalFolderFile());
}
@@ -100,7 +102,7 @@ public class LibLoader {
/**
* Attempts to load the bridge DLL from the local folder.
*/
- private void loadBridgeDLLFromLocal() {
+ private void loadBridgeDLLFromLocal() throws FileNotFoundException {
LOG.log(Level.INFO, "Attempting to load the bridge DLL from the local folder.");
loadBridgeDLLFromFile(reefFileNames.getBridgeDLLInLocalFolderFile());
}
@@ -110,10 +112,18 @@ public class LibLoader {
*
* @param bridgeDLLFile
*/
- private static void loadBridgeDLLFromFile(final File bridgeDLLFile) {
- LOG.log(Level.INFO, "Attempting to load the bridge DLL from {0}", bridgeDLLFile);
- System.load(bridgeDLLFile.getAbsolutePath());
- LOG.log(Level.INFO, "Successfully loaded the bridge DLL from {0}", bridgeDLLFile);
+ private static void loadBridgeDLLFromFile(final File bridgeDLLFile) throws FileNotFoundException {
+ if (!bridgeDLLFile.exists()) {
+ throw new FileNotFoundException("Unable to load Bridge DLL from " + bridgeDLLFile.getAbsolutePath() + " because the file can't be found.");
+ }
+ try {
+ LOG.log(Level.INFO, "Attempting to load the bridge DLL from {0}", bridgeDLLFile);
+ System.load(bridgeDLLFile.getAbsolutePath());
+ LOG.log(Level.INFO, "Successfully loaded the bridge DLL from {0}", bridgeDLLFile);
+ } catch (final Throwable t) {
+ LOG.log(Level.WARNING, "Unable to load " + bridgeDLLFile.getAbsolutePath(), t);
+ throw t;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index fd2bb69..eb6b802 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -18,20 +18,10 @@
*/
package org.apache.reef.runtime.yarn.client;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
@@ -40,10 +30,10 @@ import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.JobJarMaker;
import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
+import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
-import org.apache.reef.runtime.yarn.util.YarnTypes;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.annotations.Parameter;
@@ -54,9 +44,6 @@ import org.apache.reef.tang.util.ReflectionUtilities;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -67,40 +54,34 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
private static final Logger LOG = Logger.getLogger(YarnJobSubmissionHandler.class.getName());
private final YarnConfiguration yarnConfiguration;
- private final YarnClient yarnClient;
private final JobJarMaker jobJarMaker;
- private final REEFFileNames filenames;
+ private final REEFFileNames fileNames;
private final ClasspathProvider classpath;
- private final FileSystem fileSystem;
private final ConfigurationSerializer configurationSerializer;
+ private final JobUploader uploader;
private final double jvmSlack;
@Inject
YarnJobSubmissionHandler(
final YarnConfiguration yarnConfiguration,
final JobJarMaker jobJarMaker,
- final REEFFileNames filenames,
+ final REEFFileNames fileNames,
final ClasspathProvider classpath,
final ConfigurationSerializer configurationSerializer,
+ final JobUploader uploader,
final @Parameter(JVMHeapSlack.class) double jvmSlack) throws IOException {
this.yarnConfiguration = yarnConfiguration;
this.jobJarMaker = jobJarMaker;
- this.filenames = filenames;
+ this.fileNames = fileNames;
this.classpath = classpath;
this.configurationSerializer = configurationSerializer;
+ this.uploader = uploader;
this.jvmSlack = jvmSlack;
-
- this.fileSystem = FileSystem.get(yarnConfiguration);
-
- this.yarnClient = YarnClient.createYarnClient();
- this.yarnClient.init(this.yarnConfiguration);
- this.yarnClient.start();
}
@Override
public void close() {
- this.yarnClient.stop();
}
@Override
@@ -108,86 +89,24 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionProto.getIdentifier());
- try {
-
- LOG.log(Level.FINE, "Requesting Application ID from YARN.");
-
- final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication();
- final GetNewApplicationResponse applicationResponse = yarnClientApplication.getNewApplicationResponse();
-
- final ApplicationSubmissionContext applicationSubmissionContext =
- yarnClientApplication.getApplicationSubmissionContext();
-
- final ApplicationId applicationId = applicationSubmissionContext.getApplicationId();
-
- LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
-
- // set the application name
- applicationSubmissionContext.setApplicationName(
- "reef-job-" + jobSubmissionProto.getIdentifier());
+ try (final YarnSubmissionHelper submissionHelper =
+ new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, this.classpath)) {
LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
-
- final Path submissionFolder = new Path(
- "/tmp/" + this.filenames.getJobFolderPrefix() + applicationId.getId() + "/");
-
- final Configuration driverConfiguration =
- makeDriverConfiguration(jobSubmissionProto, submissionFolder);
-
- final File jobSubmissionFile =
- this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration);
-
- final Path uploadedJobJarPath = this.uploadToJobFolder(jobSubmissionFile, submissionFolder);
-
- final Map<String, LocalResource> resources = new HashMap<>(1);
- resources.put(this.filenames.getREEFFolderName(),
- this.makeLocalResourceForJarFile(uploadedJobJarPath));
-
- // SET MEMORY RESOURCE
- final int amMemory = getMemory(
- jobSubmissionProto, applicationResponse.getMaximumResourceCapability().getMemory());
- applicationSubmissionContext.setResource(Resource.newInstance(amMemory, 1));
-
- // SET EXEC COMMAND
- final List<String> launchCommand = new JavaLaunchCommandBuilder()
- .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
- .setLaunchID(jobSubmissionProto.getIdentifier())
- .setConfigurationFileName(this.filenames.getDriverConfigurationPath())
- .setClassPath(this.classpath.getDriverClasspath())
- .setMemory(amMemory)
- .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName())
- .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName())
- .build();
-
- applicationSubmissionContext.setAMContainerSpec(
- YarnTypes.getContainerLaunchContext(launchCommand, resources));
-
- applicationSubmissionContext.setPriority(getPriority(jobSubmissionProto));
-
- // Set the queue to which this application is to be submitted in the RM
- applicationSubmissionContext.setQueue(getQueue(jobSubmissionProto, "default"));
- LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", applicationId);
-
- if (LOG.isLoggable(Level.FINEST)) {
- LOG.log(Level.FINEST, "REEF app command: {0}", StringUtils.join(launchCommand, ' '));
- }
-
- // TODO: this is currently being developed on a hacked 2.4.0 bits, should be 2.4.1
- final String minVersionKeepContainerOptionAvailable = "2.4.0";
-
- // when supported, set KeepContainersAcrossApplicationAttempts to be true
- // so that when driver (AM) crashes, evaluators will still be running and we can recover later.
- if (YarnTypes.isAtOrAfterVersion(minVersionKeepContainerOptionAvailable)) {
- LOG.log(
- Level.FINE,
- "Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported, will set it to true.",
- minVersionKeepContainerOptionAvailable);
-
- applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
- }
-
- this.yarnClient.submitApplication(applicationSubmissionContext);
-
+ final JobFolder jobFolderOnDfs = this.uploader.createJobFolder(submissionHelper.getApplicationId());
+ final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionProto, jobFolderOnDfs.getPath());
+ final File jobSubmissionFile = this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration);
+ final LocalResource driverJarOnDfs = jobFolderOnDfs.uploadAsLocalResource(jobSubmissionFile);
+
+ submissionHelper
+ .addLocalResource(this.fileNames.getREEFFolderName(), driverJarOnDfs)
+ .setApplicationName(jobSubmissionProto.getIdentifier())
+ .setDriverMemory(jobSubmissionProto.getDriverMemory())
+ .setPriority(getPriority(jobSubmissionProto))
+ .setQueue(getQueue(jobSubmissionProto, "default"))
+ .submit(jobSubmissionProto.getRemoteId());
+
+ LOG.log(Level.FINEST, "Submitted job with ID [{0}]", jobSubmissionProto.getIdentifier());
} catch (final YarnException | IOException e) {
throw new RuntimeException("Unable to submit Driver to YARN.", e);
}
@@ -199,7 +118,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
private Configuration makeDriverConfiguration(
final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
final Path jobFolderPath) throws IOException {
- Configuration config = this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration());
+ final Configuration config = this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration());
final String userBoundJobSubmissionDirectory = config.getNamedParameter((NamedParameterNode<?>) config.getClassHierarchy().getNode(ReflectionUtilities.getFullName(DriverJobSubmissionDirectory.class)));
LOG.log(Level.FINE, "user bound job submission Directory: " + userBoundJobSubmissionDirectory);
final String finalJobFolderPath =
@@ -215,17 +134,8 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()));
}
- private final Path uploadToJobFolder(final File file, final Path jobFolder) throws IOException {
- final Path source = new Path(file.getAbsolutePath());
- final Path destination = new Path(jobFolder, file.getName());
- LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination});
- this.fileSystem.copyFromLocalFile(false, true, source, destination);
- return destination;
- }
-
- private Priority getPriority(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
- return Priority.newInstance(
- jobSubmissionProto.hasPriority() ? jobSubmissionProto.getPriority() : 0);
+ private static int getPriority(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+ return jobSubmissionProto.hasPriority() ? jobSubmissionProto.getPriority() : 0;
}
/**
@@ -239,39 +149,5 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
jobSubmissionProto.getQueue() : defaultQueue;
}
- /**
- * Extract the desired driver memory from jobSubmissionProto.
- * <p/>
- * returns maxMemory if that desired amount is more than maxMemory
- */
- private int getMemory(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
- final int maxMemory) {
- final int amMemory;
- final int requestedMemory = jobSubmissionProto.getDriverMemory();
- if (requestedMemory <= maxMemory) {
- amMemory = requestedMemory;
- } else {
- LOG.log(Level.WARNING,
- "Requested {0}MB of memory for the driver. " +
- "The max on this YARN installation is {1}. " +
- "Using {1} as the memory for the driver.",
- new Object[]{requestedMemory, maxMemory});
- amMemory = maxMemory;
- }
- return amMemory;
- }
- /**
- * Creates a LocalResource instance for the JAR file referenced by the given Path
- */
- private LocalResource makeLocalResourceForJarFile(final Path path) throws IOException {
- final LocalResource localResource = Records.newRecord(LocalResource.class);
- final FileStatus status = FileContext.getFileContext(fileSystem.getUri()).getFileStatus(path);
- localResource.setType(LocalResourceType.ARCHIVE);
- localResource.setVisibility(LocalResourceVisibility.APPLICATION);
- localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
- localResource.setTimestamp(status.getModificationTime());
- localResource.setSize(status.getLen());
- return localResource;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
new file mode 100644
index 0000000..7d171cd
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.yarn.util.YarnTypes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper code that wraps the YARN Client API for our purposes.
+ */
+public final class YarnSubmissionHelper implements Closeable{
+ private static final Logger LOG = Logger.getLogger(YarnSubmissionHelper.class.getName());
+
+ private final YarnClient yarnClient;
+ private final YarnClientApplication yarnClientApplication;
+ private final GetNewApplicationResponse applicationResponse;
+ private final ApplicationSubmissionContext applicationSubmissionContext;
+ private final ApplicationId applicationId;
+ private final Map<String, LocalResource> resources = new HashMap<>();
+ private final REEFFileNames fileNames;
+ private final ClasspathProvider classpath;
+
+
+ public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
+ final REEFFileNames fileNames,
+ final ClasspathProvider classpath) throws IOException, YarnException {
+ this.fileNames = fileNames;
+ this.classpath = classpath;
+ {
+ LOG.log(Level.FINE, "Initializing YARN Client");
+ this.yarnClient = YarnClient.createYarnClient();
+ this.yarnClient.init(yarnConfiguration);
+ this.yarnClient.start();
+ LOG.log(Level.FINE, "Initialized YARN Client");
+ }
+ {
+ LOG.log(Level.FINE, "Requesting Application ID from YARN.");
+ this.yarnClientApplication = this.yarnClient.createApplication();
+ this.applicationResponse = yarnClientApplication.getNewApplicationResponse();
+ this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext();
+ this.applicationId = applicationSubmissionContext.getApplicationId();
+ LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
+ }
+ }
+
+ /**
+ *
+ * @return the application ID assigned by YARN.
+ */
+ public int getApplicationId() {
+ return this.applicationId.getId();
+ }
+
+ /**
+ * Set the name of the application to be submitted.
+ * @param applicationName
+ * @return
+ */
+ public YarnSubmissionHelper setApplicationName(final String applicationName) {
+ applicationSubmissionContext.setApplicationName(applicationName);
+ return this;
+ }
+
+ /**
+ * Set the amount of memory to be allocated to the Driver.
+ * @param megabytes
+ * @return
+ */
+ public YarnSubmissionHelper setDriverMemory(final int megabytes) {
+ applicationSubmissionContext.setResource(Resource.newInstance(getMemory(megabytes), 1));
+ return this;
+ }
+
+ /**
+ * Add a file to be localized on the driver.
+ * @param resourceName
+ * @param resource
+ * @return
+ */
+ public YarnSubmissionHelper addLocalResource(final String resourceName, final LocalResource resource) {
+ resources.put(resourceName, resource);
+ return this;
+ }
+
+ /**
+ * Set the priority of the job.
+ * @param priority
+ * @return
+ */
+ public YarnSubmissionHelper setPriority(final int priority) {
+ this.applicationSubmissionContext.setPriority(Priority.newInstance(priority));
+ return this;
+ }
+
+ /**
+ * Assign this job submission to a queue.
+ * @param queueName
+ * @return
+ */
+ public YarnSubmissionHelper setQueue(final String queueName) {
+ this.applicationSubmissionContext.setQueue(queueName);
+ return this;
+ }
+
+ public void submit(final String clientRemoteIdentifier) throws IOException, YarnException {
+ // SET EXEC COMMAND
+ final List<String> launchCommand = new JavaLaunchCommandBuilder()
+ .setErrorHandlerRID(clientRemoteIdentifier)
+ .setLaunchID(this.applicationSubmissionContext.getApplicationName())
+ .setConfigurationFileName(this.fileNames.getDriverConfigurationPath())
+ .setClassPath(this.classpath.getDriverClasspath())
+ .setMemory(this.applicationSubmissionContext.getResource().getMemory())
+ .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName())
+ .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStderrFileName())
+ .build();
+
+ this.applicationSubmissionContext.setAMContainerSpec(YarnTypes.getContainerLaunchContext(launchCommand, this.resources));
+
+ LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", this.applicationId);
+
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, "REEF app command: {0}", StringUtils.join(launchCommand, ' '));
+ }
+
+ // TODO: this is currently being developed on a hacked 2.4.0 bits, should be 2.4.1
+ final String minVersionKeepContainerOptionAvailable = "2.4.0";
+
+ // when supported, set KeepContainersAcrossApplicationAttempts to be true
+ // so that when driver (AM) crashes, evaluators will still be running and we can recover later.
+ if (YarnTypes.isAtOrAfterVersion(minVersionKeepContainerOptionAvailable)) {
+ LOG.log(
+ Level.FINE,
+ "Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported, will set it to true.",
+ minVersionKeepContainerOptionAvailable);
+
+ applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
+ }
+
+ this.yarnClient.submitApplication(applicationSubmissionContext);
+ }
+
+ /**
+ * Extract the desired driver memory from jobSubmissionProto.
+ * <p/>
+ * returns maxMemory if that desired amount is more than maxMemory
+ */
+ private int getMemory(final int requestedMemory) {
+ final int maxMemory = applicationResponse.getMaximumResourceCapability().getMemory();
+ final int amMemory;
+
+ if (requestedMemory <= maxMemory) {
+ amMemory = requestedMemory;
+ } else {
+ LOG.log(Level.WARNING,
+ "Requested {0}MB of memory for the driver. " +
+ "The max on this YARN installation is {1}. " +
+ "Using {1} as the memory for the driver.",
+ new Object[]{requestedMemory, maxMemory});
+ amMemory = maxMemory;
+ }
+ return amMemory;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.yarnClient.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java
new file mode 100644
index 0000000..8ea1df1
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobFolder.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.uploader;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper class that represents a folder on the destination filesystem.
+ */
+public final class JobFolder {
+
+ private static final Logger LOG = Logger.getLogger(JobFolder.class.getName());
+ private final FileSystem fileSystem;
+ private final Path path;
+
+ /**
+ * This constructor is only called by JobUploader.
+ *
+ * @param fileSystem
+ * @param path
+ * @throws IOException when the given path can't be created on the given FileSystem
+ */
+ JobFolder(final FileSystem fileSystem, final Path path) throws IOException {
+ this.fileSystem = fileSystem;
+ this.path = path;
+ this.fileSystem.mkdirs(this.path);
+ }
+
+ /**
+ * Uploads the given file to the DFS
+ *
+ * @param localFile
+ * @return the Path representing the file on the DFS.
+ * @throws IOException
+ */
+ public Path upload(final File localFile) throws IOException {
+ if (!localFile.exists()) {
+ throw new FileNotFoundException(localFile.getAbsolutePath());
+ }
+
+ final Path source = new Path(localFile.getAbsolutePath());
+ final Path destination = new Path(this.path, localFile.getName());
+ try {
+ this.fileSystem.copyFromLocalFile(source, destination);
+ } catch (final IOException e) {
+ LOG.log(Level.SEVERE, "Unable to upload {0} to {1}", new Object[]{source, destination});
+ throw e;
+ }
+ LOG.log(Level.FINE, "Uploaded {0} to {1}", new Object[]{source, destination});
+
+ return destination;
+ }
+
+ /**
+ * Shortcut to first upload the file and then form a LocalResource for the YARN submission.
+ *
+ * @param localFile
+ * @return
+ * @throws IOException
+ */
+ public LocalResource uploadAsLocalResource(final File localFile) throws IOException {
+ final Path p = upload(localFile);
+ return getLocalResourceForPath(p);
+ }
+
+ /**
+ * Creates a LocalResource instance for the JAR file referenced by the given Path
+ */
+ public LocalResource getLocalResourceForPath(final Path path) throws IOException {
+ final LocalResource localResource = Records.newRecord(LocalResource.class);
+ final FileStatus status = FileContext.getFileContext(fileSystem.getUri()).getFileStatus(path);
+ localResource.setType(LocalResourceType.ARCHIVE);
+ localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
+ localResource.setTimestamp(status.getModificationTime());
+ localResource.setSize(status.getLen());
+ return localResource;
+ }
+
+ /**
+ * @return the Path on the DFS represented by this object.
+ */
+ public Path getPath() {
+ return this.path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1aae0ea4/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
new file mode 100644
index 0000000..29badbd
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/uploader/JobUploader.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.uploader;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+/**
+ * Helper class to upload the driver files to HDFS.
+ */
+public final class JobUploader {
+
+ private final FileSystem fileSystem;
+ private final REEFFileNames fileNames;
+
+ @Inject
+ JobUploader(final YarnConfiguration yarnConfiguration,
+ final REEFFileNames fileNames) throws IOException {
+ this.fileNames = fileNames;
+ this.fileSystem = FileSystem.get(yarnConfiguration);
+ }
+
+ /**
+ * Creates the Job folder on the DFS.
+ *
+ * @param applicationId
+ * @return a reference to the JobFolder that can be used to upload files to it.
+ * @throws IOException
+ */
+ public JobFolder createJobFolder(final String applicationId) throws IOException {
+ // TODO: This really should be configurable, but wasn't in the code I moved as part of [REEF-228]
+ final Path jobFolderPath = new Path("/tmp/" + this.fileNames.getJobFolderPrefix() + applicationId + "/");
+ return new JobFolder(this.fileSystem, jobFolderPath);
+ }
+
+ /**
+ * Convenience override for int ids.
+ *
+ * @param applicationId
+ * @return
+ * @throws IOException
+ */
+ public JobFolder createJobFolder(final int applicationId) throws IOException {
+ return this.createJobFolder(Integer.toString(applicationId));
+ }
+}