You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/12/08 00:04:53 UTC
reef git commit: [REEF-1681] Allow REEF Driver to run in unmanaged AM
mode
Repository: reef
Updated Branches:
refs/heads/master 7c04284ed -> eb66b2e9d
[REEF-1681] Allow REEF Driver to run in unmanaged AM mode
* Introduce `DriverIsUnmanaged` config parameter
* Call `ApplicationSubmissionContext.setUnmanagedAM()`
when submitting application request to YARN
* Minor refactoring for readability
JIRA:
[REEF-1681](https://issues.apache.org/jira/browse/REEF-1681)
Pull request:
This closes #
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/eb66b2e9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/eb66b2e9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/eb66b2e9
Branch: refs/heads/master
Commit: eb66b2e9d473418b8df9415b26809016c526114a
Parents: 7c04284
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Tue Dec 6 18:25:12 2016 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Wed Dec 7 15:51:37 2016 -0800
----------------------------------------------------------------------
.../bridge/client/YarnJobSubmissionClient.java | 19 ++++++-----
.../driver/parameters/DriverIsUnmanaged.java | 33 ++++++++++++++++++++
.../yarn/client/YarnJobSubmissionHandler.java | 14 ++++++---
.../yarn/client/YarnSubmissionHelper.java | 30 +++++++++---------
4 files changed, 70 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/eb66b2e9/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index e76f025..fde89cb 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.driver.parameters.DriverIsUnmanaged;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
@@ -63,29 +64,33 @@ import java.util.logging.Logger;
public final class YarnJobSubmissionClient {
private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName());
+
+ private final boolean isUnmanaged;
+ private final List<String> commandPrefixList;
private final JobUploader uploader;
private final REEFFileNames fileNames;
private final YarnConfiguration yarnConfiguration;
private final ClasspathProvider classpath;
private final SecurityTokenProvider tokenProvider;
- private final List<String> commandPrefixList;
private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator;
@Inject
- YarnJobSubmissionClient(final JobUploader uploader,
+ YarnJobSubmissionClient(@Parameter(DriverIsUnmanaged.class) final boolean isUnmanaged,
+ @Parameter(DriverLaunchCommandPrefix.class) final List<String> commandPrefixList,
+ final JobUploader uploader,
final YarnConfiguration yarnConfiguration,
final REEFFileNames fileNames,
final ClasspathProvider classpath,
- @Parameter(DriverLaunchCommandPrefix.class)
- final List<String> commandPrefixList,
final SecurityTokenProvider tokenProvider,
final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) {
+
+ this.isUnmanaged = isUnmanaged;
+ this.commandPrefixList = commandPrefixList;
this.uploader = uploader;
this.fileNames = fileNames;
this.yarnConfiguration = yarnConfiguration;
this.classpath = classpath;
this.tokenProvider = tokenProvider;
- this.commandPrefixList = commandPrefixList;
this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator;
}
@@ -109,8 +114,8 @@ public final class YarnJobSubmissionClient {
private void launch(final YarnClusterSubmissionFromCS yarnSubmission) throws IOException, YarnException {
// ------------------------------------------------------------------------
// Get an application ID
- try (final YarnSubmissionHelper submissionHelper =
- new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath, tokenProvider, commandPrefixList)) {
+ try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(
+ yarnConfiguration, fileNames, classpath, tokenProvider, isUnmanaged, commandPrefixList)) {
// ------------------------------------------------------------------------
// Prepare the JAR
http://git-wip-us.apache.org/repos/asf/reef/blob/eb66b2e9/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverIsUnmanaged.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverIsUnmanaged.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverIsUnmanaged.java
new file mode 100644
index 0000000..78eea09
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverIsUnmanaged.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * If true, run Driver in unmanaged mode.
+ * (i.e. co-located with the client and not managed by the RM).
+ * This setting is false by default.
+ */
+@NamedParameter(doc = "Run driver in unmanaged mode", default_value = "false")
+public final class DriverIsUnmanaged implements Name<Boolean> {
+ private DriverIsUnmanaged() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/eb66b2e9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index e2c6389..ab361bc 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.DriverIsUnmanaged;
import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
@@ -53,32 +54,35 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
private static final Logger LOG = Logger.getLogger(YarnJobSubmissionHandler.class.getName());
+ private final String defaultQueueName;
+ private final boolean isUnmanaged;
private final YarnConfiguration yarnConfiguration;
private final JobJarMaker jobJarMaker;
private final REEFFileNames fileNames;
private final ClasspathProvider classpath;
private final JobUploader uploader;
- private final String defaultQueueName;
private final SecurityTokenProvider tokenProvider;
private final DriverConfigurationProvider driverConfigurationProvider;
@Inject
YarnJobSubmissionHandler(
+ @Parameter(JobQueue.class) final String defaultQueueName,
+ @Parameter(DriverIsUnmanaged.class) final boolean isUnmanaged,
final YarnConfiguration yarnConfiguration,
final JobJarMaker jobJarMaker,
final REEFFileNames fileNames,
final ClasspathProvider classpath,
final JobUploader uploader,
- @Parameter(JobQueue.class) final String defaultQueueName,
final SecurityTokenProvider tokenProvider,
final DriverConfigurationProvider driverConfigurationProvider) throws IOException {
+ this.defaultQueueName = defaultQueueName;
+ this.isUnmanaged = isUnmanaged;
this.yarnConfiguration = yarnConfiguration;
this.jobJarMaker = jobJarMaker;
this.fileNames = fileNames;
this.classpath = classpath;
this.uploader = uploader;
- this.defaultQueueName = defaultQueueName;
this.tokenProvider = tokenProvider;
this.driverConfigurationProvider = driverConfigurationProvider;
}
@@ -92,8 +96,8 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionEvent.getIdentifier());
- try (final YarnSubmissionHelper submissionHelper =
- new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, this.classpath, this.tokenProvider)) {
+ try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(
+ this.yarnConfiguration, this.fileNames, this.classpath, this.tokenProvider, this.isUnmanaged)) {
LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
final Optional<String> userBoundJobSubmissionDirectory =
http://git-wip-us.apache.org/repos/asf/reef/blob/eb66b2e9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index b4841ac..278e4e8 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -41,37 +41,38 @@ import java.util.logging.Logger;
/**
* Helper code that wraps the YARN Client API for our purposes.
*/
-public final class YarnSubmissionHelper implements Closeable{
+public final class YarnSubmissionHelper implements Closeable {
+
private static final Logger LOG = Logger.getLogger(YarnSubmissionHelper.class.getName());
private final YarnClient yarnClient;
- private final YarnClientApplication yarnClientApplication;
private final GetNewApplicationResponse applicationResponse;
private final ApplicationSubmissionContext applicationSubmissionContext;
private final ApplicationId applicationId;
private final Map<String, LocalResource> resources = new HashMap<>();
- private final REEFFileNames fileNames;
private final ClasspathProvider classpath;
private final SecurityTokenProvider tokenProvider;
private final List<String> commandPrefixList;
+
private String driverStdoutFilePath;
private String driverStderrFilePath;
- private Class launcherClazz;
+ private Class launcherClazz = REEFLauncher.class;
private List<String> configurationFilePaths;
public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
final REEFFileNames fileNames,
final ClasspathProvider classpath,
final SecurityTokenProvider tokenProvider,
+ final boolean isUnmanaged,
final List<String> commandPrefixList) throws IOException, YarnException {
- this.fileNames = fileNames;
+
this.classpath = classpath;
this.driverStdoutFilePath =
- ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName();
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStdoutFileName();
this.driverStderrFilePath =
- ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStderrFileName();
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStderrFileName();
LOG.log(Level.FINE, "Initializing YARN Client");
this.yarnClient = YarnClient.createYarnClient();
@@ -80,22 +81,23 @@ public final class YarnSubmissionHelper implements Closeable{
LOG.log(Level.FINE, "Initialized YARN Client");
LOG.log(Level.FINE, "Requesting Application ID from YARN.");
- this.yarnClientApplication = this.yarnClient.createApplication();
- this.applicationResponse = this.yarnClientApplication.getNewApplicationResponse();
- this.applicationSubmissionContext = this.yarnClientApplication.getApplicationSubmissionContext();
+ final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication();
+ this.applicationResponse = yarnClientApplication.getNewApplicationResponse();
+ this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext();
+ this.applicationSubmissionContext.setUnmanagedAM(isUnmanaged);
this.applicationId = this.applicationSubmissionContext.getApplicationId();
this.tokenProvider = tokenProvider;
this.commandPrefixList = commandPrefixList;
- this.launcherClazz = REEFLauncher.class;
- this.configurationFilePaths = Collections.singletonList(this.fileNames.getDriverConfigurationPath());
+ this.configurationFilePaths = Collections.singletonList(fileNames.getDriverConfigurationPath());
LOG.log(Level.INFO, "YARN Application ID: {0}", this.applicationId);
}
public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
final REEFFileNames fileNames,
final ClasspathProvider classpath,
- final SecurityTokenProvider tokenProvider) throws IOException, YarnException {
- this(yarnConfiguration, fileNames, classpath, tokenProvider, null);
+ final SecurityTokenProvider tokenProvider,
+ final boolean isUnmanaged) throws IOException, YarnException {
+ this(yarnConfiguration, fileNames, classpath, tokenProvider, isUnmanaged, null);
}
/**