You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2017/01/31 07:24:39 UTC
reef git commit: [REEF-1686] Create a variant of
YarnClientConfiguration for Unmanaged AM
Repository: reef
Updated Branches:
refs/heads/master b0626ee47 -> b8d2bad86
[REEF-1686] Create a variant of YarnClientConfiguration for Unmanaged AM
Summary of changes:
* Create Driver and Client configuration modules for the Unmanaged AM app, and add extra parameters required for that mode;
* Implement Unmanaged AM app submission functionality;
* Make `YarnDriverConfigurationProviderImpl` class public so that Unmanaged AM configuration can refer to it.
JIRA: [REEF-1686](https://issues.apache.org/jira/browse/REEF-1686)
This closes #1243
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b8d2bad8
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b8d2bad8
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b8d2bad8
Branch: refs/heads/master
Commit: b8d2bad8643bf3bbbee27bbd20d8799fc2046967
Parents: b0626ee
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Wed Jan 25 16:25:35 2017 -0800
Committer: Julia Wang <jw...@yahoo.com>
Committed: Mon Jan 30 19:36:47 2017 -0800
----------------------------------------------------------------------
.../YarnDriverConfigurationProviderImpl.java | 5 +-
.../yarn/client/parameters/RootFolder.java | 35 +++++
.../UnmanagedAmYarnClientConfiguration.java | 72 ++++++++++
.../UnmanagedAmYarnDriverConfiguration.java | 93 +++++++++++++
.../UnmanagedAmYarnJobSubmissionHandler.java | 117 ++++++++++++++++
.../UnmanagedAmYarnSubmissionHelper.java | 139 +++++++++++++++++++
.../client/unmanaged/UnmanagedDriverFiles.java | 78 +++++++++++
.../yarn/client/unmanaged/package-info.java | 22 +++
8 files changed, 560 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
index 5509cd1..37f6634 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfigurationProviderImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.reef.runtime.yarn.client;
+import org.apache.reef.annotations.audience.Private;
import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
import org.apache.reef.runtime.yarn.driver.RuntimeIdentifier;
@@ -33,7 +34,9 @@ import static org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration.*;
/**
* Default driver configuration provider for yarn runtime.
*/
-final class YarnDriverConfigurationProviderImpl implements DriverConfigurationProvider {
+@Private
+public final class YarnDriverConfigurationProviderImpl implements DriverConfigurationProvider {
+
private final double jvmSlack;
@Inject
http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java
new file mode 100644
index 0000000..5f27fc9
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/parameters/RootFolder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Directory in the local filesystem to hold resources required to run the driver and submit evaluators.
+ */
+@NamedParameter(doc = "Folder to hold resources of the Unmanaged Driver",
+ default_value = RootFolder.DEFAULT_VALUE, short_name = "root_folder")
+public final class RootFolder implements Name<String> {
+
+ public static final String DEFAULT_VALUE = "REEF_LOCAL_RUNTIME";
+
+ /** Empty private constructor to prohibit instantiation of the utility class. */
+ private RootFolder() { }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java
new file mode 100644
index 0000000..7862ad9
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnClientConfiguration.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.client.parameters.DriverConfigurationProviders;
+import org.apache.reef.driver.parameters.DriverIsUnmanaged;
+import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration;
+import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.client.YarnDriverConfigurationProviderImpl;
+import org.apache.reef.runtime.yarn.client.parameters.JobPriority;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.runtime.yarn.client.parameters.RootFolder;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.ConfigurationProvider;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.util.logging.LoggingSetup;
+
+/**
+ * A ConfigurationModule for the YARN resource manager.
+ */
+@Public
+@ClientSide
+public class UnmanagedAmYarnClientConfiguration extends ConfigurationModuleBuilder {
+
+ static {
+ LoggingSetup.setupCommonsLogging();
+ }
+
+ public static final OptionalParameter<String> YARN_QUEUE_NAME = new OptionalParameter<>();
+ public static final OptionalParameter<Integer> YARN_PRIORITY = new OptionalParameter<>();
+ public static final OptionalParameter<String> ROOT_FOLDER = new OptionalParameter<>();
+
+ /** Configuration provides whose Configuration will be merged into all Driver Configuration. */
+ public static final OptionalImpl<ConfigurationProvider> DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>();
+
+ public static final ConfigurationModule CONF = new UnmanagedAmYarnClientConfiguration()
+ .merge(CommonRuntimeConfiguration.CONF)
+ .bindNamedParameter(DriverIsUnmanaged.class, "true")
+ // Bind YARN
+ .bindImplementation(JobSubmissionHandler.class, UnmanagedAmYarnJobSubmissionHandler.class)
+ .bindImplementation(DriverConfigurationProvider.class, YarnDriverConfigurationProviderImpl.class)
+ // Bind the parameters given by the user
+ .bindNamedParameter(JobQueue.class, YARN_QUEUE_NAME)
+ .bindNamedParameter(JobPriority.class, YARN_PRIORITY)
+ .bindNamedParameter(RootFolder.class, ROOT_FOLDER)
+ .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+ // Bind external constructors. Taken from YarnExternalConstructors.registerClientConstructors
+ .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class)
+ .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java
new file mode 100644
index 0000000..2a57a97
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnDriverConfiguration.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.driver.parameters.DriverIsUnmanaged;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.runtime.common.driver.api.*;
+import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
+import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorTimeout;
+import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.launch.REEFErrorHandler;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.driver.*;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * Build configuration for REEF driver running in unmanaged mode under the YARN resource manager.
+ */
+@Public
+@DriverSide
+public final class UnmanagedAmYarnDriverConfiguration extends ConfigurationModuleBuilder {
+
+ public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>();
+ public static final RequiredParameter<String> JOB_SUBMISSION_DIRECTORY = new RequiredParameter<>();
+
+ public static final OptionalParameter<Integer> YARN_HEARTBEAT_INTERVAL = new OptionalParameter<>();
+ public static final OptionalImpl<RackNameFormatter> RACK_NAME_FORMATTER = new OptionalImpl<>();
+ public static final OptionalParameter<Long> EVALUATOR_TIMEOUT = new OptionalParameter<>();
+ public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>();
+ public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
+
+ public static final ConfigurationModule CONF =
+ new UnmanagedAmYarnDriverConfiguration()
+ .bindNamedParameter(LaunchID.class, JOB_IDENTIFIER)
+ .bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER)
+ .bindNamedParameter(JobSubmissionDirectory.class, JOB_SUBMISSION_DIRECTORY)
+ // REEF client parameters
+ .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_UNMANAGED_DRIVER")
+ .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
+ .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+ // YARN runtime
+ .bindNamedParameter(DriverIsUnmanaged.class, "true")
+ .bindImplementation(ResourceLaunchHandler.class, YARNResourceLaunchHandler.class)
+ .bindImplementation(ResourceReleaseHandler.class, YARNResourceReleaseHandler.class)
+ .bindImplementation(ResourceRequestHandler.class, YarnResourceRequestHandler.class)
+ .bindImplementation(ResourceManagerStartHandler.class, YARNRuntimeStartHandler.class)
+ .bindImplementation(ResourceManagerStopHandler.class, YARNRuntimeStopHandler.class)
+ .bindConstructor(YarnConfiguration.class, YarnConfigurationConstructor.class)
+ .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+ .bindNamedParameter(YarnHeartbeatPeriod.class, YARN_HEARTBEAT_INTERVAL)
+ // AbstractDriverRuntime parameters
+ .bindNamedParameter(EvaluatorTimeout.class, EVALUATOR_TIMEOUT)
+ .bindNamedParameter(ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER)
+ .bindNamedParameter(ErrorHandlerRID.class, CLIENT_REMOTE_IDENTIFIER)
+ .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+ .bindImplementation(RackNameFormatter.class, RACK_NAME_FORMATTER)
+ .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+ .bindNamedParameter(DefinedRuntimes.class, RuntimeIdentifier.RUNTIME_NAME)
+ .build();
+
+ /** Cannot instantiate this utility class. */
+ private UnmanagedAmYarnDriverConfiguration() { }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java
new file mode 100644
index 0000000..d75a2b5
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnJobSubmissionHandler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
+import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
+import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Private
+@ClientSide
+final class UnmanagedAmYarnJobSubmissionHandler implements JobSubmissionHandler {
+
+ private static final Logger LOG = Logger.getLogger(UnmanagedAmYarnJobSubmissionHandler.class.getName());
+
+ private final String defaultQueueName;
+ private final UnmanagedDriverFiles driverFiles;
+ private final UnmanagedAmYarnSubmissionHelper submissionHelper;
+
+ private String applicationId = null;
+
+ @Inject
+ private UnmanagedAmYarnJobSubmissionHandler(
+ @Parameter(JobQueue.class) final String defaultQueueName,
+ final UnmanagedDriverFiles driverFiles,
+ final YarnConfiguration yarnConfiguration,
+ final SecurityTokenProvider tokenProvider) throws IOException {
+
+ this.defaultQueueName = defaultQueueName;
+ this.driverFiles = driverFiles;
+
+ try {
+ this.submissionHelper = new UnmanagedAmYarnSubmissionHelper(yarnConfiguration, tokenProvider);
+ } catch (final IOException | YarnException ex) {
+ LOG.log(Level.SEVERE, "Cannot create YARN client", ex);
+ throw new RuntimeException("Cannot create YARN client", ex);
+ }
+ }
+
+ @Override
+ public void close() {
+ this.submissionHelper.close();
+ }
+
+ @Override
+ public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
+
+ final String jobId = jobSubmissionEvent.getIdentifier();
+ LOG.log(Level.FINEST, "Submitting UNMANAGED AM job: {0}", jobSubmissionEvent);
+
+ try {
+ this.driverFiles.copyGlobalsFrom(jobSubmissionEvent);
+
+ this.submissionHelper
+ .setApplicationName(jobId)
+ .setPriority(jobSubmissionEvent.getPriority().orElse(0))
+ .setQueue(getQueue(jobSubmissionEvent))
+ .submit();
+
+ this.applicationId = this.submissionHelper.getStringApplicationId();
+ LOG.log(Level.FINER, "Submitted UNMANAGED AM job with ID {0} :: {1}", new String[] {jobId, this.applicationId});
+
+ } catch (final IOException | YarnException ex) {
+ throw new RuntimeException("Unable to submit UNMANAGED Driver to YARN: " + jobId, ex);
+ }
+ }
+
+ /**
+ * Get the RM application ID.
+ * Return null if the application has not been submitted yet, or was submitted unsuccessfully.
+ * @return string application ID or null if no app has been submitted yet.
+ */
+ @Override
+ public String getApplicationId() {
+ return this.applicationId;
+ }
+
+ /**
+ * Extract the queue name from the jobSubmissionEvent or return default if none is set.
+ */
+ private String getQueue(final JobSubmissionEvent jobSubmissionEvent) {
+ try {
+ return Tang.Factory.getTang().newInjector(
+ jobSubmissionEvent.getConfiguration()).getNamedInstance(JobQueue.class);
+ } catch (final InjectionException e) {
+ return this.defaultQueueName;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java
new file mode 100644
index 0000000..d25dbad
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedAmYarnSubmissionHelper.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.reef.runtime.yarn.client.SecurityTokenProvider;
+import org.apache.reef.runtime.yarn.client.UserCredentialSecurityTokenProvider;
+import org.apache.reef.runtime.yarn.util.YarnTypes;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper code that wraps the YARN Client API for our purposes.
+ */
+final class UnmanagedAmYarnSubmissionHelper implements AutoCloseable {
+
+ private static final Logger LOG = Logger.getLogger(UnmanagedAmYarnSubmissionHelper.class.getName());
+
+ private final SecurityTokenProvider tokenProvider;
+ private final YarnClient yarnClient;
+ private final ApplicationSubmissionContext applicationSubmissionContext;
+ private final ApplicationId applicationId;
+
+ UnmanagedAmYarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
+ final SecurityTokenProvider tokenProvider) throws IOException, YarnException {
+
+ this.tokenProvider = tokenProvider;
+
+ LOG.log(Level.FINE, "Initializing YARN Client");
+ this.yarnClient = YarnClient.createYarnClient();
+ this.yarnClient.init(yarnConfiguration);
+ this.yarnClient.start();
+ LOG.log(Level.FINE, "Initialized YARN Client");
+
+ LOG.log(Level.FINE, "Requesting UNMANAGED Application ID from YARN.");
+
+ final ContainerLaunchContext launchContext = YarnTypes.getContainerLaunchContext(
+ Collections.<String>emptyList(), Collections.<String, LocalResource>emptyMap(), tokenProvider.getTokens());
+
+ final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication();
+
+ this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext();
+ this.applicationSubmissionContext.setAMContainerSpec(launchContext);
+ this.applicationSubmissionContext.setUnmanagedAM(true);
+
+ this.applicationId = this.applicationSubmissionContext.getApplicationId();
+
+ LOG.log(Level.INFO, "YARN UNMANAGED Application ID: {0}", this.applicationId);
+ }
+
+ /**
+ * @return the application ID assigned by YARN.
+ */
+ String getStringApplicationId() {
+ return this.applicationId.toString();
+ }
+
+ /**
+ * Set the name of the application to be submitted.
+ * @param applicationName YARN application name - a human-readable string.
+ * @return reference to self for chain calls.
+ */
+ UnmanagedAmYarnSubmissionHelper setApplicationName(final String applicationName) {
+ this.applicationSubmissionContext.setApplicationName(applicationName);
+ return this;
+ }
+
+ /**
+ * Set the priority of the job.
+ * @param priority YARN application priority.
+ * @return reference to self for chain calls.
+ */
+ UnmanagedAmYarnSubmissionHelper setPriority(final int priority) {
+ this.applicationSubmissionContext.setPriority(Priority.newInstance(priority));
+ return this;
+ }
+
+ /**
+ * Assign this job submission to a queue.
+ * @param queueName YARN queue name.
+ * @return reference to self for chain calls.
+ */
+ UnmanagedAmYarnSubmissionHelper setQueue(final String queueName) {
+ this.applicationSubmissionContext.setQueue(queueName);
+ return this;
+ }
+
+ void submit() throws IOException, YarnException {
+
+ LOG.log(Level.INFO, "Submitting REEF Application with UNMANAGED AM to YARN. ID: {0}", this.applicationId);
+ this.yarnClient.submitApplication(this.applicationSubmissionContext);
+
+ final Token<AMRMTokenIdentifier> token = this.yarnClient.getAMRMToken(this.applicationId);
+ this.tokenProvider.addTokens(UserCredentialSecurityTokenProvider.serializeToken(token));
+ }
+
+ @Override
+ public void close() {
+
+ if (LOG.isLoggable(Level.FINER)) {
+ try {
+ final ApplicationReport appReport = this.yarnClient.getApplicationReport(this.applicationId);
+ LOG.log(Level.FINER, "Application {0} final attempt {1} status: {2}/{3}", new Object[] {
+ this.applicationId, appReport.getCurrentApplicationAttemptId(),
+ appReport.getYarnApplicationState(), appReport.getFinalApplicationStatus() });
+ } catch (final IOException | YarnException ex) {
+ LOG.log(Level.WARNING, "Cannot get final status of Unmanaged AM app: " + this.applicationId, ex);
+ }
+ }
+
+ LOG.log(Level.FINE, "Closing Unmanaged AM YARN application: {0}", this.applicationId);
+ this.yarnClient.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java
new file mode 100644
index 0000000..e573a61
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/UnmanagedDriverFiles.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;
+
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
+import org.apache.reef.runtime.common.files.FileResource;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.client.parameters.RootFolder;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the files added to a driver.
+ */
+final class UnmanagedDriverFiles {
+
+ private static final Logger LOG = Logger.getLogger(UnmanagedDriverFiles.class.getName());
+
+ private final String rootFolderName;
+ private final REEFFileNames fileNames;
+
+ @Inject
+ private UnmanagedDriverFiles(
+ @Parameter(RootFolder.class) final String rootFolderName,
+ final REEFFileNames fileNames) {
+
+ this.rootFolderName = rootFolderName;
+ this.fileNames = fileNames;
+ }
+
+ public void copyGlobalsFrom(final JobSubmissionEvent jobSubmissionEvent) throws IOException {
+
+ final File reefGlobalPath = new File(this.rootFolderName, this.fileNames.getGlobalFolderPath());
+ if (!reefGlobalPath.exists() && !reefGlobalPath.mkdirs()) {
+ LOG.log(Level.WARNING, "Failed to create directory: {0}", reefGlobalPath);
+ throw new RuntimeException("Failed to create directory: " + reefGlobalPath);
+ }
+
+ reefGlobalPath.deleteOnExit();
+
+ for (final FileResource fileResource : jobSubmissionEvent.getGlobalFileSet()) {
+
+ final File sourceFile = new File(fileResource.getPath());
+ final File destinationFile = new File(reefGlobalPath, sourceFile.getName());
+ LOG.log(Level.FINEST, "Copy file: {0} -> {1}", new Object[] {sourceFile, destinationFile});
+
+ try {
+ Files.createSymbolicLink(destinationFile.toPath(), sourceFile.toPath());
+ } catch (final IOException ex) {
+ LOG.log(Level.FINER, "Can't symlink file " + sourceFile + ", copying instead.", ex);
+ Files.copy(sourceFile.toPath(), destinationFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/b8d2bad8/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java
new file mode 100644
index 0000000..ec6f0d8
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/unmanaged/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Client-side event handlers for YARN resourcemanager and AM running in unmanaged mode.
+ */
+package org.apache.reef.runtime.yarn.client.unmanaged;