You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2017/01/31 07:24:39 UTC

reef git commit: [REEF-1686] Create a variant of YarnClientConfiguration for Unmanaged AM

Repository: reef
Updated Branches:
  refs/heads/master b0626ee47 -> b8d2bad86


[REEF-1686] Create a variant of YarnClientConfiguration for Unmanaged AM

Summary of changes:
   * Create Driver and Client configuration modules for the Unmanaged AM app, and add extra parameters required for that mode;
   * Implement Unmanaged AM app submission functionality;
   * Make `YarnDriverConfigurationProviderImpl` class public so that Unmanaged AM configuration can refer to it.

JIRA: [REEF-1686](https://issues.apache.org/jira/browse/REEF-1686)

This closes #1243


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b8d2bad8
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b8d2bad8
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b8d2bad8

Branch: refs/heads/master
Commit: b8d2bad8643bf3bbbee27bbd20d8799fc2046967
Parents: b0626ee
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Wed Jan 25 16:25:35 2017 -0800
Committer: Julia Wang <jw...@yahoo.com>
Committed: Mon Jan 30 19:36:47 2017 -0800

----------------------------------------------------------------------
 .../YarnDriverConfigurationProviderImpl.java    |   5 +-
 .../yarn/client/parameters/RootFolder.java      |  35 +++++
 .../UnmanagedAmYarnClientConfiguration.java     |  72 ++++++++++
 .../UnmanagedAmYarnDriverConfiguration.java     |  93 +++++++++++++
 .../UnmanagedAmYarnJobSubmissionHandler.java    | 117 ++++++++++++++++
 .../UnmanagedAmYarnSubmissionHelper.java        | 139 +++++++++++++++++++
 .../client/unmanaged/UnmanagedDriverFiles.java  |  78 +++++++++++
 .../yarn/client/unmanaged/package-info.java     |  22 +++
 8 files changed, 560 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
index 5509cd1..37f6634 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.runtime.yarn.client;
 
+import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
 import org.apache.reef.runtime.yarn.driver.RuntimeIdentifier;
@@ -33,7 +34,9 @@ import static org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration.*;
 /**
  * Default driver configuration provider for yarn runtime.
  */
-final class YarnDriverConfigurationProviderImpl implements DriverConfigurationProvider {
+@Private
+public final class YarnDriverConfigurationProviderImpl implements DriverConfigurationProvider {
+
   private final double jvmSlack;
 
   @Inject

http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java
new file mode 100644
index 0000000..5f27fc9
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Directory in the local filesystem to hold resources required to run the driver and submit evaluators.
+ */
+@NamedParameter(doc = "Folder to hold resources of the Unmanaged Driver",
+                default_value = RootFolder.DEFAULT_VALUE, short_name = "root_folder")
+public final class RootFolder implements Name<String> {
+
+  public static final String DEFAULT_VALUE = "REEF_LOCAL_RUNTIME";
+
+  /** Empty private constructor to prohibit instantiation of the utility class. */
+  private RootFolder() { }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java
new file mode 100644
index 0000000..7862ad9
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.driver.parameters.DriverIsUnmanaged;
+import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.client.YarnDriverConfigurationProviderImpl;
+import org.apache.reef.runtime.yarn.client.parameters.JobPriority;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.runtime.yarn.client.parameters.RootFolder;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.ConfigurationProvider;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.util.logging.LoggingSetup;
+
+/**
+ * A ConfigurationModule for the YARN resource manager.
+ */
+@Public
+@ClientSide
+public class UnmanagedAmYarnClientConfiguration extends ConfigurationModuleBuilder {
+
+  static {
+    LoggingSetup.setupCommonsLogging();
+  }
+
+  public static final OptionalParameter<String> YARN_QUEUE_NAME = new OptionalParameter<>();
+  public static final OptionalParameter<Integer> YARN_PRIORITY = new OptionalParameter<>();
+  public static final OptionalParameter<String> ROOT_FOLDER = new OptionalParameter<>();
+
+  /** Configuration provides whose Configuration will be merged into all Driver Configuration. */
+  public static final OptionalImpl<ConfigurationProvider> DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>();
+
+  public static final ConfigurationModule CONF = new UnmanagedAmYarnClientConfiguration()
+      .merge(CommonRuntimeConfiguration.CONF)
+      .bindNamedParameter(DriverIsUnmanaged.class, "true")
+      // Bind YARN
+      .bindImplementation(JobSubmissionHandler.class, UnmanagedAmYarnJobSubmissionHandler.class)
+      .bindImplementation(DriverConfigurationProvider.class, YarnDriverConfigurationProviderImpl.class)
+      // Bind the parameters given by the user
+      .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
+      .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
+      .bindNamedParameter(RootFolder.class, ROOT_FOLDER)
+      .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+      // Bind external constructors. Taken from  YarnExternalConstructors.registerClientConstructors
+      .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
+      .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS)
+      .build();
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java
new file mode 100644
index 0000000..2a57a97
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.driver.parameters.DriverIsUnmanaged;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.runtime.common.driver.api.*;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorTimeout;
+import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.launch.REEFErrorHandler;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.driver.*;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * Build configuration for REEF driver running in unmanaged mode under the YARN resource manager.
+ */
+@Public
+@DriverSide
+public final class UnmanagedAmYarnDriverConfiguration extends ConfigurationModuleBuilder {
+
+  public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>();
+  public static final RequiredParameter<String> JOB_SUBMISSION_DIRECTORY = new RequiredParameter<>();
+
+  public static final OptionalParameter<Integer> YARN_HEARTBEAT_INTERVAL = new OptionalParameter<>();
+  public static final OptionalImpl<RackNameFormatter> RACK_NAME_FORMATTER = new OptionalImpl<>();
+  public static final OptionalParameter<Long> EVALUATOR_TIMEOUT = new OptionalParameter<>();
+  public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>();
+  public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
+
+  public static final ConfigurationModule CONF =
+      new UnmanagedAmYarnDriverConfiguration()
+          .bindNamedParameter(LaunchID.class, JOB_IDENTIFIER)
+          .bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER)
+          .bindNamedParameter(JobSubmissionDirectory.class, JOB_SUBMISSION_DIRECTORY)
+          // REEF client parameters
+          .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_UNMANAGED_DRIVER")
+          .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
+          .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+          // YARN runtime
+          .bindNamedParameter(DriverIsUnmanaged.class, "true")
+          .bindImplementation(ResourceLaunchHandler.class, YARNResourceLaunchHandler.class)
+          .bindImplementation(ResourceReleaseHandler.class, YARNResourceReleaseHandler.class)
+          .bindImplementation(ResourceRequestHandler.class, YarnResourceRequestHandler.class)
+          .bindImplementation(ResourceManagerStartHandler.class, YARNRuntimeStartHandler.class)
+          .bindImplementation(ResourceManagerStopHandler.class, YARNRuntimeStopHandler.class)
+          .bindConstructor(YarnConfiguration.class, YarnConfigurationConstructor.class)
+          .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+          .bindNamedParameter(YarnHeartbeatPeriod.class, YARN_HEARTBEAT_INTERVAL)
+          // AbstractDriverRuntime parameters
+          .bindNamedParameter(EvaluatorTimeout.class, EVALUATOR_TIMEOUT)
+          .bindNamedParameter(ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER)
+          .bindNamedParameter(ErrorHandlerRID.class, CLIENT_REMOTE_IDENTIFIER)
+          .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+          .bindImplementation(RackNameFormatter.class, RACK_NAME_FORMATTER)
+          .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+          .bindNamedParameter(DefinedRuntimes.class, RuntimeIdentifier.RUNTIME_NAME)
+          .build();
+
+  /** Cannot instantiate this utility class. */
+  private UnmanagedAmYarnDriverConfiguration() { }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java
new file mode 100644
index 0000000..d75a2b5
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Private
+@ClientSide
+final class UnmanagedAmYarnJobSubmissionHandler implements JobSubmissionHandler {
+
+  private static final Logger LOG = Logger.getLogger(UnmanagedAmYarnJobSubmissionHandler.class.getName());
+
+  private final String defaultQueueName;
+  private final UnmanagedDriverFiles driverFiles;
+  private final UnmanagedAmYarnSubmissionHelper submissionHelper;
+
+  private String applicationId = null;
+
+  @Inject
+  private UnmanagedAmYarnJobSubmissionHandler(
+      @Parameter(JobQueue.class) final String defaultQueueName,
+      final UnmanagedDriverFiles driverFiles,
+      final YarnConfiguration yarnConfiguration,
+      final SecurityTokenProvider tokenProvider) throws IOException {
+
+    this.defaultQueueName = defaultQueueName;
+    this.driverFiles = driverFiles;
+
+    try {
+      this.submissionHelper = new UnmanagedAmYarnSubmissionHelper(yarnConfiguration, tokenProvider);
+    } catch (final IOException | YarnException ex) {
+      LOG.log(Level.SEVERE, "Cannot create YARN client", ex);
+      throw new RuntimeException("Cannot create YARN client", ex);
+    }
+  }
+
+  @Override
+  public void close() {
+    this.submissionHelper.close();
+  }
+
+  @Override
+  public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
+
+    final String jobId = jobSubmissionEvent.getIdentifier();
+    LOG.log(Level.FINEST, "Submitting UNMANAGED AM job: {0}", jobSubmissionEvent);
+
+    try {
+      this.driverFiles.copyGlobalsFrom(jobSubmissionEvent);
+
+      this.submissionHelper
+          .setApplicationName(jobId)
+          .setPriority(jobSubmissionEvent.getPriority().orElse(0))
+          .setQueue(getQueue(jobSubmissionEvent))
+          .submit();
+
+      this.applicationId = this.submissionHelper.getStringApplicationId();
+      LOG.log(Level.FINER, "Submitted UNMANAGED AM job with ID {0} :: {1}", new String[] {jobId, this.applicationId});
+
+    } catch (final IOException | YarnException ex) {
+      throw new RuntimeException("Unable to submit UNMANAGED Driver to YARN: " + jobId, ex);
+    }
+  }
+
+  /**
+   * Get the RM application ID.
+   * Return null if the application has not been submitted yet, or was submitted unsuccessfully.
+   * @return string application ID or null if no app has been submitted yet.
+   */
+  @Override
+  public String getApplicationId() {
+    return this.applicationId;
+  }
+
+  /**
+   * Extract the queue name from the jobSubmissionEvent or return default if none is set.
+   */
+  private String getQueue(final JobSubmissionEvent jobSubmissionEvent) {
+    try {
+      return Tang.Factory.getTang().newInjector(
+          jobSubmissionEvent.getConfiguration()).getNamedInstance(JobQueue.class);
+    } catch (final InjectionException e) {
+      return this.defaultQueueName;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java
new file mode 100644
index 0000000..d25dbad
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
+import org.apache.reef.runtime.yarn.client.UserCredentialSecurityTokenProvider;
+import org.apache.reef.runtime.yarn.util.YarnTypes;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper code that wraps the YARN Client API for our purposes.
+ */
+final class UnmanagedAmYarnSubmissionHelper implements AutoCloseable {
+
+  private static final Logger LOG = Logger.getLogger(UnmanagedAmYarnSubmissionHelper.class.getName());
+
+  private final SecurityTokenProvider tokenProvider;
+  private final YarnClient yarnClient;
+  private final ApplicationSubmissionContext applicationSubmissionContext;
+  private final ApplicationId applicationId;
+
+  UnmanagedAmYarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
+      final SecurityTokenProvider tokenProvider) throws IOException, YarnException {
+
+    this.tokenProvider = tokenProvider;
+
+    LOG.log(Level.FINE, "Initializing YARN Client");
+    this.yarnClient = YarnClient.createYarnClient();
+    this.yarnClient.init(yarnConfiguration);
+    this.yarnClient.start();
+    LOG.log(Level.FINE, "Initialized YARN Client");
+
+    LOG.log(Level.FINE, "Requesting UNMANAGED Application ID from YARN.");
+
+    final ContainerLaunchContext launchContext = YarnTypes.getContainerLaunchContext(
+        Collections.<String>emptyList(), Collections.<String, LocalResource>emptyMap(), tokenProvider.getTokens());
+
+    final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication();
+
+    this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext();
+    this.applicationSubmissionContext.setAMContainerSpec(launchContext);
+    this.applicationSubmissionContext.setUnmanagedAM(true);
+
+    this.applicationId = this.applicationSubmissionContext.getApplicationId();
+
+    LOG.log(Level.INFO, "YARN UNMANAGED Application ID: {0}", this.applicationId);
+  }
+
+  /**
+   * @return the application ID assigned by YARN.
+   */
+  String getStringApplicationId() {
+    return this.applicationId.toString();
+  }
+
+  /**
+   * Set the name of the application to be submitted.
+   * @param applicationName YARN application name - a human-readable string.
+   * @return reference to self for chain calls.
+   */
+  UnmanagedAmYarnSubmissionHelper setApplicationName(final String applicationName) {
+    this.applicationSubmissionContext.setApplicationName(applicationName);
+    return this;
+  }
+
+  /**
+   * Set the priority of the job.
+   * @param priority YARN application priority.
+   * @return reference to self for chain calls.
+   */
+  UnmanagedAmYarnSubmissionHelper setPriority(final int priority) {
+    this.applicationSubmissionContext.setPriority(Priority.newInstance(priority));
+    return this;
+  }
+
+  /**
+   * Assign this job submission to a queue.
+   * @param queueName YARN queue name.
+   * @return reference to self for chain calls.
+   */
+  UnmanagedAmYarnSubmissionHelper setQueue(final String queueName) {
+    this.applicationSubmissionContext.setQueue(queueName);
+    return this;
+  }
+
+  void submit() throws IOException, YarnException {
+
+    LOG.log(Level.INFO, "Submitting REEF Application with UNMANAGED AM to YARN. ID: {0}", this.applicationId);
+    this.yarnClient.submitApplication(this.applicationSubmissionContext);
+
+    final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId);
+    this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token));
+  }
+
+  @Override
+  public void close() {
+
+    if (LOG.isLoggable(Level.FINER)) {
+      try {
+        final ApplicationReport appReport = this.yarnClient.getApplicationReport(this.applicationId);
+        LOG.log(Level.FINER, "Application {0} final attempt {1} status: {2}/{3}", new Object[] {
+            this.applicationId, appReport.getCurrentApplicationAttemptId(),
+            appReport.getYarnApplicationState(), appReport.getFinalApplicationStatus() });
+      } catch (final IOException | YarnException ex) {
+        LOG.log(Level.WARNING, "Cannot get final status of Unmanaged AM app: " + this.applicationId, ex);
+      }
+    }
+
+    LOG.log(Level.FINE, "Closing Unmanaged AM YARN application: {0}", this.applicationId);
+    this.yarnClient.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java
new file mode 100644
index 0000000..e573a61
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
+import org.apache.reef.runtime.common.files.FileResource;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.client.parameters.RootFolder;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the files added to a driver.
+ */
+final class UnmanagedDriverFiles {
+
+  private static final Logger LOG = Logger.getLogger(UnmanagedDriverFiles.class.getName());
+
+  private final String rootFolderName;
+  private final REEFFileNames fileNames;
+
+  @Inject
+  private UnmanagedDriverFiles(
+      @Parameter(RootFolder.class) final String rootFolderName,
+      final REEFFileNames fileNames) {
+
+    this.rootFolderName = rootFolderName;
+    this.fileNames = fileNames;
+  }
+
+  public void copyGlobalsFrom(final JobSubmissionEvent jobSubmissionEvent) throws IOException {
+
+    final File reefGlobalPath = new File(this.rootFolderName, this.fileNames.getGlobalFolderPath());
+    if (!reefGlobalPath.exists() && !reefGlobalPath.mkdirs()) {
+      LOG.log(Level.WARNING, "Failed to create directory: {0}", reefGlobalPath);
+      throw new RuntimeException("Failed to create directory: " + reefGlobalPath);
+    }
+
+    reefGlobalPath.deleteOnExit();
+
+    for (final FileResource fileResource : jobSubmissionEvent.getGlobalFileSet()) {
+
+      final File sourceFile = new File(fileResource.getPath());
+      final File destinationFile = new File(reefGlobalPath, sourceFile.getName());
+      LOG.log(Level.FINEST, "Copy file: {0} -> {1}", new Object[] {sourceFile, destinationFile});
+
+      try {
+        Files.createSymbolicLink(destinationFile.toPath(), sourceFile.toPath());
+      } catch (final IOException ex) {
+        LOG.log(Level.FINER, "Can't symlink file " + sourceFile + ", copying instead.", ex);
+        Files.copy(sourceFile.toPath(), destinationFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java
new file mode 100644
index 0000000..ec6f0d8
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Client-side event handlers for YARN resourcemanager and AM running in unmanaged mode.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;