You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:52 UTC
[19/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/ClassPathBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/ClassPathBuilder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/ClassPathBuilder.java
new file mode 100644
index 0000000..fa7780b
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/ClassPathBuilder.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn;
+
+import org.apache.reef.util.HadoopEnvironment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+/**
+ * A helper class to assemble a class path.
+ * <p/>
+ * It uses a TreeSet internally for both a prefix and a suffix of the classpath. This makes sure that duplicate entries
+ * are avoided.
+ */
+@NotThreadSafe
+final class ClassPathBuilder {
+ private final LinkedHashSet<String> prefix = new LinkedHashSet<>();
+ private final LinkedHashSet<String> suffix = new LinkedHashSet<>();
+
+ /**
+ * The oracle that tells us whether a given path could be a YARN configuration path.
+ *
+ * @param path
+ * @return
+ */
+ private static boolean couldBeYarnConfigurationPath(final String path) {
+ return path.contains("conf") ||
+ path.contains("etc") ||
+ path.contains(HadoopEnvironment.HADOOP_CONF_DIR);
+ }
+
+ /**
+ * Adds the given classpath entry. A guess will be made whether it refers to a configuration folder, in which case
+ * it will be added to the prefix. Else, it will be added to the suffix.
+ *
+ * @param classPathEntry
+ */
+ void add(final String classPathEntry) {
+ // Make sure that the cluster configuration is in front of user classes
+ if (couldBeYarnConfigurationPath(classPathEntry)) {
+ this.addToPrefix(classPathEntry);
+ } else {
+ this.addToSuffix(classPathEntry);
+ }
+ }
+
+ /**
+ * Adds the given classPathEntry to the classpath suffix
+ *
+ * @param classPathEntry
+ */
+ void addToSuffix(final String classPathEntry) {
+ this.suffix.add(classPathEntry);
+ }
+
+ /**
+ * Adds the given classPathEntry to the classpath prefix
+ *
+ * @param classPathEntry
+ */
+ void addToPrefix(final String classPathEntry) {
+ this.prefix.add(classPathEntry);
+ }
+
+ /**
+ * Adds all entries given using the <code>add()</code> method.
+ *
+ * @param entries
+ */
+ void addAll(final String... entries) {
+ for (final String classPathEntry : entries) {
+ this.add(classPathEntry);
+ }
+ }
+
+ /**
+ * @return the suffix in an immutable list.
+ */
+ List<String> getSuffixAsImmutableList() {
+ return Collections.unmodifiableList(new ArrayList<>(this.suffix));
+ }
+
+ /**
+ * @return the prefix in an immutable list.
+ */
+ List<String> getPrefixAsImmutableList() {
+ return Collections.unmodifiableList(new ArrayList<>(this.prefix));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/YarnClasspathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/YarnClasspathProvider.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/YarnClasspathProvider.java
new file mode 100644
index 0000000..c5a83a3
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/YarnClasspathProvider.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn;
+
+import net.jcip.annotations.Immutable;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.util.OSUtils;
+
+import javax.inject.Inject;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Access to the classpath according to the REEF file system standard.
+ */
+@Immutable
+public final class YarnClasspathProvider implements RuntimeClasspathProvider {
+ private static final Logger LOG = Logger.getLogger(YarnClasspathProvider.class.getName());
+ private static final Level CLASSPATH_LOG_LEVEL = Level.FINE;
+
+ private static final String YARN_TOO_OLD_MESSAGE = "The version of YARN you are using is too old to support classpath assembly. Reverting to legacy method.";
+ private static final String HADOOP_CONF_DIR = OSUtils.formatVariable("HADOOP_CONF_DIR");
+ private static final String HADOOP_HOME = OSUtils.formatVariable("HADOOP_HOME");
+ private static final String HADOOP_COMMON_HOME = OSUtils.formatVariable("HADOOP_COMMON_HOME");
+ private static final String HADOOP_YARN_HOME = OSUtils.formatVariable("HADOOP_YARN_HOME");
+ private static final String HADOOP_HDFS_HOME = OSUtils.formatVariable("HADOOP_HDFS_HOME");
+ private static final String HADOOP_MAPRED_HOME = OSUtils.formatVariable("HADOOP_MAPRED_HOME");
+
+ // Used when we can't get a classpath from YARN
+ private static final String[] LEGACY_CLASSPATH_LIST = new String[]{
+ HADOOP_CONF_DIR,
+ HADOOP_HOME + "/*",
+ HADOOP_HOME + "/lib/*",
+ HADOOP_COMMON_HOME + "/*",
+ HADOOP_COMMON_HOME + "/lib/*",
+ HADOOP_YARN_HOME + "/*",
+ HADOOP_YARN_HOME + "/lib/*",
+ HADOOP_HDFS_HOME + "/*",
+ HADOOP_HDFS_HOME + "/lib/*",
+ HADOOP_MAPRED_HOME + "/*",
+ HADOOP_MAPRED_HOME + "/lib/*",
+ HADOOP_HOME + "/etc/hadoop",
+ HADOOP_HOME + "/share/hadoop/common/*",
+ HADOOP_HOME + "/share/hadoop/common/lib/*",
+ HADOOP_HOME + "/share/hadoop/yarn/*",
+ HADOOP_HOME + "/share/hadoop/yarn/lib/*",
+ HADOOP_HOME + "/share/hadoop/hdfs/*",
+ HADOOP_HOME + "/share/hadoop/hdfs/lib/*",
+ HADOOP_HOME + "/share/hadoop/mapreduce/*",
+ HADOOP_HOME + "/share/hadoop/mapreduce/lib/*"
+ };
+ private final List<String> classPathPrefix;
+ private final List<String> classPathSuffix;
+
+ @Inject
+ YarnClasspathProvider(final YarnConfiguration yarnConfiguration) {
+ boolean needsLegacyClasspath = false; // will be set to true below whenever we encounter issues with the YARN Configuration
+ final ClassPathBuilder builder = new ClassPathBuilder();
+
+ try {
+ // Add the classpath actually configured on this cluster
+ final String[] yarnClassPath = yarnConfiguration.getTrimmedStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
+ if (null == yarnClassPath || yarnClassPath.length == 0) {
+ needsLegacyClasspath = true;
+ } else {
+ builder.addAll(yarnClassPath);
+ }
+ final String[] yarnDefaultClassPath = YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH;
+ if (null == yarnDefaultClassPath || yarnDefaultClassPath.length == 0) {
+ LOG.log(Level.SEVERE, "YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH is empty. This indicates a broken cluster configuration");
+ needsLegacyClasspath = true;
+ } else {
+ builder.addAll(yarnDefaultClassPath);
+ }
+ } catch (final NoSuchFieldError e) {
+ // This means that one of the static fields above aren't actually in YarnConfiguration.
+ // The reason for that is most likely that we encounter a really old version of YARN.
+ needsLegacyClasspath = true;
+ LOG.log(Level.SEVERE, YARN_TOO_OLD_MESSAGE);
+ }
+
+ if (needsLegacyClasspath) {
+ builder.addAll(LEGACY_CLASSPATH_LIST);
+ }
+
+ this.classPathPrefix = builder.getPrefixAsImmutableList();
+ this.classPathSuffix = builder.getSuffixAsImmutableList();
+ this.logClasspath();
+ }
+
+ @Override
+ public List<String> getDriverClasspathPrefix() {
+ return this.classPathPrefix;
+ }
+
+ @Override
+ public List<String> getDriverClasspathSuffix() {
+ return this.classPathSuffix;
+ }
+
+ @Override
+ public List<String> getEvaluatorClasspathPrefix() {
+ return this.classPathPrefix;
+ }
+
+ @Override
+ public List<String> getEvaluatorClasspathSuffix() {
+ return this.classPathSuffix;
+ }
+
+
+ private void logClasspath() {
+ if (LOG.isLoggable(CLASSPATH_LOG_LEVEL)) {
+ final StringBuilder message = new StringBuilder("Classpath:\n\t");
+ message.append(StringUtils.join(classPathPrefix, "\n\t"));
+ message.append("\n--------------------------------\n\t");
+ message.append(StringUtils.join(classPathSuffix, "\n\t"));
+ LOG.log(CLASSPATH_LOG_LEVEL, message.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
new file mode 100644
index 0000000..c893dde
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.client.REEF;
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.runtime.common.client.REEFImplementation;
+import org.apache.reef.runtime.common.client.RunningJobImpl;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.client.parameters.JobPriority;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.util.logging.LoggingSetup;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * A ConfigurationModule for the YARN resourcemanager.
+ */
+@Public
+@ClientSide
+public class YarnClientConfiguration extends ConfigurationModuleBuilder {
+ static {
+ LoggingSetup.setupCommonsLogging();
+ }
+
+ public static final OptionalParameter<String> YARN_QUEUE_NAME = new OptionalParameter<>();
+ public static final OptionalParameter<Integer> YARN_PRIORITY = new OptionalParameter<>();
+
+ public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
+
+ public static final ConfigurationModule CONF = new YarnClientConfiguration()
+ // Bind the common resourcemanager
+ .bindImplementation(REEF.class, REEFImplementation.class)
+ .bindImplementation(RunningJob.class, RunningJobImpl.class)
+ // Bind the message codec for REEF.
+ .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+ // Bind YARN
+ .bindImplementation(JobSubmissionHandler.class, YarnJobSubmissionHandler.class)
+ // Bind the parameters given by the user
+ .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
+ .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
+ .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+ .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+ // Bind external constructors. Taken from YarnExternalConstructors.registerClientConstructors
+ .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
+ .build();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
new file mode 100644
index 0000000..fd2bb69
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.JobJarMaker;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
+import org.apache.reef.runtime.yarn.util.YarnTypes;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.tang.types.NamedParameterNode;
+import org.apache.reef.tang.util.ReflectionUtilities;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Private
+@ClientSide
+final class YarnJobSubmissionHandler implements JobSubmissionHandler {
+
+ private static final Logger LOG = Logger.getLogger(YarnJobSubmissionHandler.class.getName());
+
+ private final YarnConfiguration yarnConfiguration;
+ private final YarnClient yarnClient;
+ private final JobJarMaker jobJarMaker;
+ private final REEFFileNames filenames;
+ private final ClasspathProvider classpath;
+ private final FileSystem fileSystem;
+ private final ConfigurationSerializer configurationSerializer;
+ private final double jvmSlack;
+
+ @Inject
+ YarnJobSubmissionHandler(
+ final YarnConfiguration yarnConfiguration,
+ final JobJarMaker jobJarMaker,
+ final REEFFileNames filenames,
+ final ClasspathProvider classpath,
+ final ConfigurationSerializer configurationSerializer,
+ final @Parameter(JVMHeapSlack.class) double jvmSlack) throws IOException {
+
+ this.yarnConfiguration = yarnConfiguration;
+ this.jobJarMaker = jobJarMaker;
+ this.filenames = filenames;
+ this.classpath = classpath;
+ this.configurationSerializer = configurationSerializer;
+ this.jvmSlack = jvmSlack;
+
+ this.fileSystem = FileSystem.get(yarnConfiguration);
+
+ this.yarnClient = YarnClient.createYarnClient();
+ this.yarnClient.init(this.yarnConfiguration);
+ this.yarnClient.start();
+ }
+
+ @Override
+ public void close() {
+ this.yarnClient.stop();
+ }
+
+ @Override
+ public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+
+ LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionProto.getIdentifier());
+
+ try {
+
+ LOG.log(Level.FINE, "Requesting Application ID from YARN.");
+
+ final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication();
+ final GetNewApplicationResponse applicationResponse = yarnClientApplication.getNewApplicationResponse();
+
+ final ApplicationSubmissionContext applicationSubmissionContext =
+ yarnClientApplication.getApplicationSubmissionContext();
+
+ final ApplicationId applicationId = applicationSubmissionContext.getApplicationId();
+
+ LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
+
+ // set the application name
+ applicationSubmissionContext.setApplicationName(
+ "reef-job-" + jobSubmissionProto.getIdentifier());
+
+ LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
+
+ final Path submissionFolder = new Path(
+ "/tmp/" + this.filenames.getJobFolderPrefix() + applicationId.getId() + "/");
+
+ final Configuration driverConfiguration =
+ makeDriverConfiguration(jobSubmissionProto, submissionFolder);
+
+ final File jobSubmissionFile =
+ this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration);
+
+ final Path uploadedJobJarPath = this.uploadToJobFolder(jobSubmissionFile, submissionFolder);
+
+ final Map<String, LocalResource> resources = new HashMap<>(1);
+ resources.put(this.filenames.getREEFFolderName(),
+ this.makeLocalResourceForJarFile(uploadedJobJarPath));
+
+ // SET MEMORY RESOURCE
+ final int amMemory = getMemory(
+ jobSubmissionProto, applicationResponse.getMaximumResourceCapability().getMemory());
+ applicationSubmissionContext.setResource(Resource.newInstance(amMemory, 1));
+
+ // SET EXEC COMMAND
+ final List<String> launchCommand = new JavaLaunchCommandBuilder()
+ .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
+ .setLaunchID(jobSubmissionProto.getIdentifier())
+ .setConfigurationFileName(this.filenames.getDriverConfigurationPath())
+ .setClassPath(this.classpath.getDriverClasspath())
+ .setMemory(amMemory)
+ .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName())
+ .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName())
+ .build();
+
+ applicationSubmissionContext.setAMContainerSpec(
+ YarnTypes.getContainerLaunchContext(launchCommand, resources));
+
+ applicationSubmissionContext.setPriority(getPriority(jobSubmissionProto));
+
+ // Set the queue to which this application is to be submitted in the RM
+ applicationSubmissionContext.setQueue(getQueue(jobSubmissionProto, "default"));
+ LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", applicationId);
+
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, "REEF app command: {0}", StringUtils.join(launchCommand, ' '));
+ }
+
+ // TODO: this is currently being developed on a hacked 2.4.0 bits, should be 2.4.1
+ final String minVersionKeepContainerOptionAvailable = "2.4.0";
+
+ // when supported, set KeepContainersAcrossApplicationAttempts to be true
+ // so that when driver (AM) crashes, evaluators will still be running and we can recover later.
+ if (YarnTypes.isAtOrAfterVersion(minVersionKeepContainerOptionAvailable)) {
+ LOG.log(
+ Level.FINE,
+ "Hadoop version is {0} or after with KeepContainersAcrossApplicationAttempts supported, will set it to true.",
+ minVersionKeepContainerOptionAvailable);
+
+ applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
+ }
+
+ this.yarnClient.submitApplication(applicationSubmissionContext);
+
+ } catch (final YarnException | IOException e) {
+ throw new RuntimeException("Unable to submit Driver to YARN.", e);
+ }
+ }
+
+ /**
+ * Assembles the Driver configuration.
+ */
+ private Configuration makeDriverConfiguration(
+ final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+ final Path jobFolderPath) throws IOException {
+ Configuration config = this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration());
+ final String userBoundJobSubmissionDirectory = config.getNamedParameter((NamedParameterNode<?>) config.getClassHierarchy().getNode(ReflectionUtilities.getFullName(DriverJobSubmissionDirectory.class)));
+ LOG.log(Level.FINE, "user bound job submission Directory: " + userBoundJobSubmissionDirectory);
+ final String finalJobFolderPath =
+ (userBoundJobSubmissionDirectory == null || userBoundJobSubmissionDirectory.isEmpty())
+ ? jobFolderPath.toString() : userBoundJobSubmissionDirectory;
+ return Configurations.merge(
+ YarnDriverConfiguration.CONF
+ .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, finalJobFolderPath)
+ .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobSubmissionProto.getIdentifier())
+ .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionProto.getRemoteId())
+ .set(YarnDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+ .build(),
+ this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()));
+ }
+
+ private final Path uploadToJobFolder(final File file, final Path jobFolder) throws IOException {
+ final Path source = new Path(file.getAbsolutePath());
+ final Path destination = new Path(jobFolder, file.getName());
+ LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination});
+ this.fileSystem.copyFromLocalFile(false, true, source, destination);
+ return destination;
+ }
+
+ private Priority getPriority(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+ return Priority.newInstance(
+ jobSubmissionProto.hasPriority() ? jobSubmissionProto.getPriority() : 0);
+ }
+
+ /**
+ * Extract the queue name from the jobSubmissionProto or return default if none is set.
+ * <p/>
+ * TODO: Revisit this. We also have a named parameter for the queue in YarnClientConfiguration.
+ */
+ private final String getQueue(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+ final String defaultQueue) {
+ return jobSubmissionProto.hasQueue() && !jobSubmissionProto.getQueue().isEmpty() ?
+ jobSubmissionProto.getQueue() : defaultQueue;
+ }
+
+ /**
+ * Extract the desired driver memory from jobSubmissionProto.
+ * <p/>
+ * returns maxMemory if that desired amount is more than maxMemory
+ */
+ private int getMemory(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+ final int maxMemory) {
+ final int amMemory;
+ final int requestedMemory = jobSubmissionProto.getDriverMemory();
+ if (requestedMemory <= maxMemory) {
+ amMemory = requestedMemory;
+ } else {
+ LOG.log(Level.WARNING,
+ "Requested {0}MB of memory for the driver. " +
+ "The max on this YARN installation is {1}. " +
+ "Using {1} as the memory for the driver.",
+ new Object[]{requestedMemory, maxMemory});
+ amMemory = maxMemory;
+ }
+ return amMemory;
+ }
+
+ /**
+ * Creates a LocalResource instance for the JAR file referenced by the given Path
+ */
+ private LocalResource makeLocalResourceForJarFile(final Path path) throws IOException {
+ final LocalResource localResource = Records.newRecord(LocalResource.class);
+ final FileStatus status = FileContext.getFileContext(fileSystem.getUri()).getFileStatus(path);
+ localResource.setType(LocalResourceType.ARCHIVE);
+ localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
+ localResource.setTimestamp(status.getModificationTime());
+ localResource.setSize(status.getLen());
+ return localResource;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobPriority.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobPriority.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobPriority.java
new file mode 100644
index 0000000..a14ad98
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobPriority.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The prioroty of the submitted job.
+ */
+@NamedParameter(doc = "The job priority.", default_value = "0", short_name = "yarn_priority")
+public final class JobPriority implements Name<Integer> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobQueue.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobQueue.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobQueue.java
new file mode 100644
index 0000000..6017a55
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/JobQueue.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The queue to submit a job to.
+ */
+@NamedParameter(doc = "The job queue.", default_value = "default", short_name = "yarn_queue")
+public final class JobQueue implements Name<String> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ApplicationMasterRegistration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ApplicationMasterRegistration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ApplicationMasterRegistration.java
new file mode 100644
index 0000000..63194e7
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ApplicationMasterRegistration.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+
+/**
+ * Helper class that holds on to the AM registration.
+ */
+final class ApplicationMasterRegistration {
+
+ private Optional<RegisterApplicationMasterResponse> registration = Optional.empty();
+
+ @Inject
+ ApplicationMasterRegistration() {
+ }
+
+ /**
+ * @return the registered registration.
+ */
+ synchronized RegisterApplicationMasterResponse getRegistration() {
+ return registration.get();
+ }
+
+ /**
+ * Set the registration information. This is a set-once field.
+ *
+ * @param registration
+ */
+ synchronized void setRegistration(final RegisterApplicationMasterResponse registration) {
+ if (this.isPresent()) {
+ throw new RuntimeException("Trying to re-register the AM");
+ }
+ this.registration = Optional.of(registration);
+ }
+
+ /**
+ * @return true, if a registration was set.
+ */
+ synchronized boolean isPresent() {
+ return this.registration.isPresent();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ContainerRequestCounter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ContainerRequestCounter.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ContainerRequestCounter.java
new file mode 100644
index 0000000..59860e2
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/ContainerRequestCounter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Used to keep track of resource requests.
+ */
+@Private
+@DriverSide
+final class ContainerRequestCounter {
+ private static final Logger LOG = Logger.getLogger(ContainerRequestCounter.class.getName());
+
+ private int counter = 0;
+
+ @Inject
+ ContainerRequestCounter() {
+ LOG.log(Level.FINEST, "Instantiated 'ContainerRequestCounter'");
+ }
+
+ /**
+ * Increment the counter by the given amount.
+ *
+ * @param number
+ */
+ synchronized void incrementBy(final int number) {
+ this.counter += number;
+ }
+
+ /**
+ * @return the current value of the counter.
+ */
+ synchronized int get() {
+ return this.counter;
+ }
+
+ /**
+ * Decrement the counter by 1.
+ */
+ synchronized void decrement() {
+ if (this.counter <= 0) {
+ LOG.log(Level.WARNING, "requestedContainerCount cannot go below 0");
+ this.counter = 0;
+ } else {
+ this.counter -= 1;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/Containers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/Containers.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/Containers.java
new file mode 100644
index 0000000..b3f4ac1
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/Containers.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Helper class that manages the set of Containers we know about.
+ */
+@Private
+@DriverSide
+final class Containers {
+ private final Map<String, Container> containers = new ConcurrentHashMap<>();
+
+ @Inject
+ Containers() {
+ }
+
+ /**
+ * @param containerID
+ * @return the container for the given id.
+ * @throws java.lang.RuntimeException when the container is unknown.
+ */
+ synchronized Container get(final String containerID) {
+ final Container result = this.containers.get(containerID);
+ if (null == result) {
+ throw new RuntimeException("Requesting an unknown container: " + containerID);
+ }
+ return result;
+ }
+
+ /**
+ * Registers the given container
+ *
+ * @param container
+ * @throws java.lang.RuntimeException if a container with the same ID had been registered before.
+ */
+ synchronized void add(final Container container) {
+ final String containerId = container.getId().toString();
+ if (this.hasContainer(containerId)) {
+ throw new RuntimeException("Trying to add a Container that is already known: " + containerId);
+ }
+ this.containers.put(containerId, container);
+ }
+
+ /**
+ * Removes the container with the given ID.
+ *
+ * @param containerId
+ * @return the container that was registered before.
+ * @throws java.lang.RuntimeException if no such container existed.
+ */
+ synchronized Container removeAndGet(final String containerId) {
+ final Container result = this.containers.remove(containerId);
+ if (null == result) {
+ throw new RuntimeException("Unknown container to remove: " + containerId);
+ }
+ return result;
+ }
+
+ /**
+ * @param containerId
+ * @return true, if a container with this ID is known.
+ */
+ synchronized boolean hasContainer(final String containerId) {
+ return this.containers.containsKey(containerId);
+ }
+
+
+ /**
+ * @param containerId
+ * @return the Container stored under this containerId or an empty Optional.
+ */
+ synchronized Optional<Container> getOptional(final String containerId) {
+ return Optional.ofNullable(this.containers.get(containerId));
+ }
+
+ /**
+ * @return an Iterable of all the known container Ids.
+ */
+ synchronized Iterable<String> getContainerIds() {
+ return new ArrayList(this.containers.keySet());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/DefaultTrackingURLProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/DefaultTrackingURLProvider.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/DefaultTrackingURLProvider.java
new file mode 100644
index 0000000..c11b2b8
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/DefaultTrackingURLProvider.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import javax.inject.Inject;
+
+final class DefaultTrackingURLProvider implements TrackingURLProvider {
+
+ @Inject
+ DefaultTrackingURLProvider() {
+ }
+
+ @Override
+ public String getTrackingUrl() {
+ return "";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
new file mode 100644
index 0000000..3a2ff5c
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.files.JobJarMaker;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages the global files on the driver and can produce the needed FileResources for container submissions.
+ */
+@DriverSide
+final class EvaluatorSetupHelper {
+
+ private static final Logger LOG = Logger.getLogger(EvaluatorSetupHelper.class.getName());
+
+ private final REEFFileNames fileNames;
+ private final ConfigurationSerializer configurationSerializer;
+ private final TempFileCreator tempFileCreator;
+ private final UploaderToJobFolder uploader;
+ private final GlobalJarUploader globalJarUploader;
+ private final boolean deleteTempFiles;
+
+ @Inject
+ EvaluatorSetupHelper(
+ final REEFFileNames fileNames,
+ final ConfigurationSerializer configurationSerializer,
+ final TempFileCreator tempFileCreator,
+ final @Parameter(DeleteTempFiles.class) boolean deleteTempFiles,
+ final UploaderToJobFolder uploader,
+ final GlobalJarUploader globalJarUploader) throws IOException {
+ this.tempFileCreator = tempFileCreator;
+ this.deleteTempFiles = deleteTempFiles;
+ this.globalJarUploader = globalJarUploader;
+
+ this.fileNames = fileNames;
+ this.configurationSerializer = configurationSerializer;
+ this.uploader = uploader;
+ }
+
+ /**
+ * @return the map to be used in formulating the evaluator launch submission.
+ */
+ Map<String, LocalResource> getGlobalResources() {
+ try {
+ return this.globalJarUploader.call();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to upload the global JAR file to the job folder.", e);
+ }
+ }
+
+
+ /**
+ * Sets up the LocalResources for a new Evaluator.
+ *
+ * @param resourceLaunchProto
+ * @return
+ * @throws IOException
+ */
+ Map<String, LocalResource> getResources(
+ final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto)
+ throws IOException {
+
+ final Map<String, LocalResource> result = new HashMap<>();
+ result.putAll(getGlobalResources());
+
+ final File localStagingFolder = this.tempFileCreator.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix());
+
+ // Write the configuration
+ final File configurationFile = new File(localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
+ this.configurationSerializer.toFile(makeEvaluatorConfiguration(resourceLaunchProto), configurationFile);
+
+ // Copy files to the staging folder
+ JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder);
+
+ // Make a JAR file out of it
+ final File localFile = tempFileCreator.createTempFile(
+ this.fileNames.getEvaluatorFolderPrefix(), this.fileNames.getJarFileSuffix());
+ new JARFileMaker(localFile).addChildren(localStagingFolder).close();
+
+ // Upload the JAR to the job folder
+ final Path pathToEvaluatorJar = this.uploader.uploadToJobFolder(localFile);
+ result.put(this.fileNames.getLocalFolderPath(), this.uploader.makeLocalResourceForJarFile(pathToEvaluatorJar));
+
+ if (this.deleteTempFiles) {
+ LOG.log(Level.FINE, "Marking [{0}] for deletion at the exit of this JVM and deleting [{1}]",
+ new Object[]{localFile.getAbsolutePath(), localStagingFolder.getAbsolutePath()});
+ localFile.deleteOnExit();
+ localStagingFolder.delete();
+ } else {
+ LOG.log(Level.FINE, "The evaluator staging folder will be kept at [{0}], the JAR at [{1}]",
+ new Object[]{localFile.getAbsolutePath(), localStagingFolder.getAbsolutePath()});
+ }
+ return result;
+ }
+
+ /**
+ * Assembles the configuration for an Evaluator.
+ *
+ * @param resourceLaunchProto
+ * @return
+ * @throws IOException
+ */
+
+ private Configuration makeEvaluatorConfiguration(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto)
+ throws IOException {
+ return Tang.Factory.getTang()
+ .newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf()))
+ .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
new file mode 100644
index 0000000..c6ee91e
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/GlobalJarUploader.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * Utility class that creates the JAR file with the global files on the driver and then uploads it to the job folder on
+ * (H)DFS.
+ */
+final class GlobalJarUploader implements Callable<Map<String, LocalResource>> {
+
+ /**
+ * Used for the file system constants.
+ */
+ private final REEFFileNames fileNames;
+ /**
+ * This will hold the actuall map to be used as the "global" resources when submitting Evaluators.
+ */
+ private final Map<String, LocalResource> globalResources = new HashMap<>(1);
+ /**
+ * Utility to actually perform the update.
+ */
+ private final UploaderToJobFolder uploader;
+ /**
+ * True, if globalResources contains the valid information which is cached after the first call to call().
+ */
+ private boolean isDone;
+
+ @Inject
+ GlobalJarUploader(final REEFFileNames fileNames,
+ final UploaderToJobFolder uploader) {
+ this.fileNames = fileNames;
+ this.uploader = uploader;
+ }
+
+ /**
+ * Creates the JAR file with the global files on the driver and then uploads it to the job folder on
+ * (H)DFS.
+ *
+ * @return the map to be used as the "global" resources when submitting Evaluators.
+ * @throws IOException if the creation of the JAR or the upload fails
+ */
+ @Override
+ public synchronized Map<String, LocalResource> call() throws IOException {
+ if (!this.isDone) {
+ final Path pathToGlobalJar = this.uploader.uploadToJobFolder(makeGlobalJar());
+ globalResources.put(this.fileNames.getGlobalFolderPath(),
+ this.uploader.makeLocalResourceForJarFile(pathToGlobalJar));
+ this.isDone = true;
+ }
+ return this.globalResources;
+ }
+
+ /**
+ * Creates the JAR file for upload.
+ *
+ * @return
+ * @throws IOException
+ */
+ private File makeGlobalJar() throws IOException {
+ final File jarFile = new File(this.fileNames.getGlobalFolderName() + this.fileNames.getJarFileSuffix());
+ new JARFileMaker(jarFile).addChildren(this.fileNames.getGlobalFolder()).close();
+ return jarFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
new file mode 100644
index 0000000..ec43666
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Helper that represents the REEF layer to the YARN runtime.
+ */
+// This is a great place to add a thread boundary, should that need arise.
+@Private
+final class REEFEventHandlers implements AutoCloseable {
+ private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> resourceAllocationHandler;
+ private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler;
+ private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandler;
+ private final EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorProtoEventHandler;
+
+ @Inject
+ REEFEventHandlers(final @Parameter(RuntimeParameters.NodeDescriptorHandler.class) EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorProtoEventHandler,
+ final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusProtoEventHandler,
+ final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> resourceAllocationHandler,
+ final @Parameter(RuntimeParameters.ResourceStatusHandler.class) EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler) {
+ this.resourceAllocationHandler = resourceAllocationHandler;
+ this.resourceStatusHandler = resourceStatusHandler;
+ this.runtimeStatusHandler = runtimeStatusProtoEventHandler;
+ this.nodeDescriptorProtoEventHandler = nodeDescriptorProtoEventHandler;
+ }
+
+ /**
+ * Inform reef of a node.
+ *
+ * @param nodeDescriptorProto
+ */
+ void onNodeDescriptor(final DriverRuntimeProtocol.NodeDescriptorProto nodeDescriptorProto) {
+ this.nodeDescriptorProtoEventHandler.onNext(nodeDescriptorProto);
+ }
+
+ /**
+ * Update REEF's view on the runtime status.
+ *
+ * @param runtimeStatusProto
+ */
+ void onRuntimeStatus(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
+ this.runtimeStatusHandler.onNext(runtimeStatusProto);
+ }
+
+ /**
+ * Inform REEF of a fresh resource allocation.
+ *
+ * @param resourceAllocationProto
+ */
+ void onResourceAllocation(final DriverRuntimeProtocol.ResourceAllocationProto resourceAllocationProto) {
+ this.resourceAllocationHandler.onNext(resourceAllocationProto);
+ }
+
+ /**
+ * Update REEF on a change to the status of a resource.
+ *
+ * @param resourceStatusProto
+ */
+ void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
+ this.resourceStatusHandler.onNext(resourceStatusProto);
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Empty, but here for a future where we need to close a threadpool
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/TrackingURLProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/TrackingURLProvider.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/TrackingURLProvider.java
new file mode 100644
index 0000000..bbb72a6
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/TrackingURLProvider.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * Implement this interface to set the tracking URL reported to YARN.
+ */
+@DefaultImplementation(DefaultTrackingURLProvider.class)
+public interface TrackingURLProvider {
+ public String getTrackingUrl();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
new file mode 100644
index 0000000..afc1d9c
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/UploaderToJobfolder.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Uploads files to the current job folder.
+ */
+final class UploaderToJobFolder {
+ private static final Logger LOG = Logger.getLogger(UploaderToJobFolder.class.getName());
+
+ /**
+ * The path on (H)DFS which is used as the job's folder.
+ */
+ private final String jobSubmissionDirectory;
+ /**
+ * The FileSystem instance to use for fs operations.
+ */
+ private final FileSystem fileSystem;
+
+ @Inject
+ UploaderToJobFolder(final @Parameter(JobSubmissionDirectory.class) String jobSubmissionDirectory,
+ final YarnConfiguration yarnConfiguration) throws IOException {
+ this.jobSubmissionDirectory = jobSubmissionDirectory;
+ this.fileSystem = FileSystem.get(yarnConfiguration);
+ }
+
+ /**
+ * Uploads the given file to the job folder on (H)DFS.
+ *
+ * @param file
+ * @return
+ * @throws java.io.IOException
+ */
+ Path uploadToJobFolder(final File file) throws IOException {
+ final Path source = new Path(file.getAbsolutePath());
+ final Path destination = new Path(this.jobSubmissionDirectory + "/" + file.getName());
+ LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{source, destination});
+ this.fileSystem.copyFromLocalFile(false, true, source, destination);
+ return destination;
+ }
+
+ /**
+ * Creates a LocalResource instance for the JAR file referenced by the given Path
+ *
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ LocalResource makeLocalResourceForJarFile(final Path path) throws IOException {
+ final LocalResource localResource = Records.newRecord(LocalResource.class);
+ final FileStatus status = FileContext.getFileContext(this.fileSystem.getUri()).getFileStatus(path);
+ localResource.setType(LocalResourceType.ARCHIVE);
+ localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setResource(ConverterUtils.getYarnUrlFromPath(status.getPath()));
+ localResource.setTimestamp(status.getModificationTime());
+ localResource.setSize(status.getLen());
+ return localResource;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
new file mode 100644
index 0000000..073cf8c
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
+import org.apache.reef.runtime.common.launch.LaunchCommandBuilder;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.util.YarnTypes;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Resource launch handler for YARN.
+ */
+public final class YARNResourceLaunchHandler implements ResourceLaunchHandler {
+
+ private static final Logger LOG = Logger.getLogger(YARNResourceLaunchHandler.class.getName());
+
+ private final Containers containers;
+ private final InjectionFuture<YarnContainerManager> yarnContainerManager;
+ private final EvaluatorSetupHelper evaluatorSetupHelper;
+ private final REEFFileNames filenames;
+ private final ClasspathProvider classpath;
+ private final double jvmHeapFactor;
+
+ @Inject
+ YARNResourceLaunchHandler(final Containers containers,
+ final InjectionFuture<YarnContainerManager> yarnContainerManager,
+ final EvaluatorSetupHelper evaluatorSetupHelper,
+ final REEFFileNames filenames,
+ final ClasspathProvider classpath,
+ final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) {
+ this.jvmHeapFactor = 1.0 - jvmHeapSlack;
+ LOG.log(Level.FINEST, "Instantiating 'YARNResourceLaunchHandler'");
+ this.containers = containers;
+ this.yarnContainerManager = yarnContainerManager;
+ this.evaluatorSetupHelper = evaluatorSetupHelper;
+ this.filenames = filenames;
+ this.classpath = classpath;
+ LOG.log(Level.FINE, "Instantiated 'YARNResourceLaunchHandler'");
+ }
+
+ @Override
+ public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) {
+ try {
+
+ final String containerId = resourceLaunchProto.getIdentifier();
+ LOG.log(Level.FINEST, "TIME: Start ResourceLaunchProto {0}", containerId);
+ final Container container = this.containers.get(containerId);
+ LOG.log(Level.FINEST, "Setting up container launch container for id={0}", container.getId());
+ final Map<String, LocalResource> localResources =
+ this.evaluatorSetupHelper.getResources(resourceLaunchProto);
+
+ final LaunchCommandBuilder commandBuilder;
+ switch (resourceLaunchProto.getType()) {
+ case JVM:
+ commandBuilder = new JavaLaunchCommandBuilder()
+ .setClassPath(this.classpath.getEvaluatorClasspath());
+ break;
+ case CLR:
+ commandBuilder = new CLRLaunchCommandBuilder();
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported container type: " + resourceLaunchProto.getType());
+ }
+
+ final List<String> command = commandBuilder
+ .setErrorHandlerRID(resourceLaunchProto.getRemoteId())
+ .setLaunchID(resourceLaunchProto.getIdentifier())
+ .setConfigurationFileName(this.filenames.getEvaluatorConfigurationPath())
+ .setMemory((int) (this.jvmHeapFactor * container.getResource().getMemory()))
+ .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getEvaluatorStderrFileName())
+ .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getEvaluatorStdoutFileName())
+ .build();
+
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST,
+ "TIME: Run ResourceLaunchProto {0} command: `{1}` with resources: `{2}`",
+ new Object[]{containerId, StringUtils.join(command, ' '), localResources});
+ }
+
+ final ContainerLaunchContext ctx = YarnTypes.getContainerLaunchContext(command, localResources);
+ this.yarnContainerManager.get().submit(container, ctx);
+
+ LOG.log(Level.FINEST, "TIME: End ResourceLaunchProto {0}", containerId);
+
+ } catch (final Throwable e) {
+ LOG.log(Level.WARNING, "Error handling resource launch message: " + resourceLaunchProto, e);
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
new file mode 100644
index 0000000..dda9fb3
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.tang.InjectionFuture;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * ResourceReleaseHandler for YARN.
+ */
+public final class YARNResourceReleaseHandler implements ResourceReleaseHandler {
+
+ private static final Logger LOG = Logger.getLogger(YARNResourceReleaseHandler.class.getName());
+
+ private final InjectionFuture<YarnContainerManager> yarnContainerManager;
+
+ @Inject
+ YARNResourceReleaseHandler(final InjectionFuture<YarnContainerManager> yarnContainerManager) {
+ this.yarnContainerManager = yarnContainerManager;
+ LOG.log(Level.FINE, "Instantiated 'YARNResourceReleaseHandler'");
+ }
+
+ @Override
+ public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto resourceReleaseProto) {
+ final String containerId = resourceReleaseProto.getIdentifier();
+ LOG.log(Level.FINEST, "Releasing container {0}", containerId);
+ this.yarnContainerManager.get().release(containerId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
new file mode 100644
index 0000000..c8ab5b2
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+import javax.inject.Inject;
+
+/**
+ * Handler of RuntimeStart for the YARN Runtime.
+ */
+public final class YARNRuntimeStartHandler implements EventHandler<RuntimeStart> {
+
+ private final YarnContainerManager yarnContainerManager;
+
+ @Inject
+ public YARNRuntimeStartHandler(final YarnContainerManager yarnContainerManager) {
+ this.yarnContainerManager = yarnContainerManager;
+ }
+
+ @Override
+ public void onNext(final RuntimeStart runtimeStart) {
+ this.yarnContainerManager.onStart();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
new file mode 100644
index 0000000..b79c906
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+
+/**
+ * Shuts down the YARN resource manager.
+ */
+public final class YARNRuntimeStopHandler implements EventHandler<RuntimeStop> {
+
+ private final YarnContainerManager yarnContainerManager;
+
+ @Inject
+ YARNRuntimeStopHandler(final YarnContainerManager yarnContainerManager) {
+ this.yarnContainerManager = yarnContainerManager;
+ }
+
+ @Override
+ public void onNext(final RuntimeStop runtimeStop) {
+ this.yarnContainerManager.onStop();
+ }
+}