You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/12/08 00:04:53 UTC

reef git commit: [REEF-1681] Allow REEF Driver to run in unmanaged AM mode

Repository: reef
Updated Branches:
  refs/heads/master 7c04284ed -> eb66b2e9d


[REEF-1681] Allow REEF Driver to run in unmanaged AM mode

  * Introduce `DriverIsUnmanaged` config parameter
  * Call `ApplicationSubmissionContext.setUnmanagedAM()`
    when submitting application request to YARN
  * Minor refactoring for readability

JIRA:
  [REEF-1681](https://issues.apache.org/jira/browse/REEF-1681)

Pull request:
  This closes #


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/eb66b2e9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/eb66b2e9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/eb66b2e9

Branch: refs/heads/master
Commit: eb66b2e9d473418b8df9415b26809016c526114a
Parents: 7c04284
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Tue Dec 6 18:25:12 2016 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Wed Dec 7 15:51:37 2016 -0800

----------------------------------------------------------------------
 .../bridge/client/YarnJobSubmissionClient.java  | 19 ++++++-----
 .../driver/parameters/DriverIsUnmanaged.java    | 33 ++++++++++++++++++++
 .../yarn/client/YarnJobSubmissionHandler.java   | 14 ++++++---
 .../yarn/client/YarnSubmissionHelper.java       | 30 +++++++++---------
 4 files changed, 70 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/eb66b2e9/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index e76f025..fde89cb 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.driver.parameters.DriverIsUnmanaged;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
@@ -63,29 +64,33 @@ import java.util.logging.Logger;
 public final class YarnJobSubmissionClient {
 
   private static final Logger LOG = Logger.getLogger(YarnJobSubmissionClient.class.getName());
+
+  private final boolean isUnmanaged;
+  private final List<String> commandPrefixList;
   private final JobUploader uploader;
   private final REEFFileNames fileNames;
   private final YarnConfiguration yarnConfiguration;
   private final ClasspathProvider classpath;
   private final SecurityTokenProvider tokenProvider;
-  private final List<String> commandPrefixList;
   private final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator;
 
   @Inject
-  YarnJobSubmissionClient(final JobUploader uploader,
+  YarnJobSubmissionClient(@Parameter(DriverIsUnmanaged.class) final boolean isUnmanaged,
+                          @Parameter(DriverLaunchCommandPrefix.class) final List<String> commandPrefixList,
+                          final JobUploader uploader,
                           final YarnConfiguration yarnConfiguration,
                           final REEFFileNames fileNames,
                           final ClasspathProvider classpath,
-                          @Parameter(DriverLaunchCommandPrefix.class)
-                          final List<String> commandPrefixList,
                           final SecurityTokenProvider tokenProvider,
                           final YarnSubmissionParametersFileGenerator jobSubmissionParametersGenerator) {
+
+    this.isUnmanaged = isUnmanaged;
+    this.commandPrefixList = commandPrefixList;
     this.uploader = uploader;
     this.fileNames = fileNames;
     this.yarnConfiguration = yarnConfiguration;
     this.classpath = classpath;
     this.tokenProvider = tokenProvider;
-    this.commandPrefixList = commandPrefixList;
     this.jobSubmissionParametersGenerator = jobSubmissionParametersGenerator;
   }
 
@@ -109,8 +114,8 @@ public final class YarnJobSubmissionClient {
   private void launch(final YarnClusterSubmissionFromCS yarnSubmission) throws IOException, YarnException {
     // ------------------------------------------------------------------------
     // Get an application ID
-    try (final YarnSubmissionHelper submissionHelper =
-             new YarnSubmissionHelper(yarnConfiguration, fileNames, classpath, tokenProvider, commandPrefixList)) {
+    try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(
+        yarnConfiguration, fileNames, classpath, tokenProvider, isUnmanaged, commandPrefixList)) {
 
       // ------------------------------------------------------------------------
       // Prepare the JAR

http://git-wip-us.apache.org/repos/asf/reef/blob/eb66b2e9/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverIsUnmanaged.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverIsUnmanaged.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverIsUnmanaged.java
new file mode 100644
index 0000000..78eea09
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverIsUnmanaged.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * If true, run Driver in unmanaged mode.
+ * (i.e. co-located with the client and not managed by the RM).
+ * This setting is false by default.
+ */
+@NamedParameter(doc = "Run driver in unmanaged mode", default_value = "false")
+public final class DriverIsUnmanaged implements Name<Boolean> {
+  private DriverIsUnmanaged() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/eb66b2e9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index e2c6389..ab361bc 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.DriverIsUnmanaged;
 import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory;
 import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
@@ -53,32 +54,35 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
 
   private static final Logger LOG = Logger.getLogger(YarnJobSubmissionHandler.class.getName());
 
+  private final String defaultQueueName;
+  private final boolean isUnmanaged;
   private final YarnConfiguration yarnConfiguration;
   private final JobJarMaker jobJarMaker;
   private final REEFFileNames fileNames;
   private final ClasspathProvider classpath;
   private final JobUploader uploader;
-  private final String defaultQueueName;
   private final SecurityTokenProvider tokenProvider;
   private final DriverConfigurationProvider driverConfigurationProvider;
 
   @Inject
   YarnJobSubmissionHandler(
+          @Parameter(JobQueue.class) final String defaultQueueName,
+          @Parameter(DriverIsUnmanaged.class) final boolean isUnmanaged,
           final YarnConfiguration yarnConfiguration,
           final JobJarMaker jobJarMaker,
           final REEFFileNames fileNames,
           final ClasspathProvider classpath,
           final JobUploader uploader,
-          @Parameter(JobQueue.class) final String defaultQueueName,
           final SecurityTokenProvider tokenProvider,
           final DriverConfigurationProvider driverConfigurationProvider) throws IOException {
 
+    this.defaultQueueName = defaultQueueName;
+    this.isUnmanaged = isUnmanaged;
     this.yarnConfiguration = yarnConfiguration;
     this.jobJarMaker = jobJarMaker;
     this.fileNames = fileNames;
     this.classpath = classpath;
     this.uploader = uploader;
-    this.defaultQueueName = defaultQueueName;
     this.tokenProvider = tokenProvider;
     this.driverConfigurationProvider = driverConfigurationProvider;
   }
@@ -92,8 +96,8 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
 
     LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionEvent.getIdentifier());
 
-    try (final YarnSubmissionHelper submissionHelper =
-             new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, this.classpath, this.tokenProvider)) {
+    try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(
+        this.yarnConfiguration, this.fileNames, this.classpath, this.tokenProvider, this.isUnmanaged)) {
 
       LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
       final Optional<String> userBoundJobSubmissionDirectory =

http://git-wip-us.apache.org/repos/asf/reef/blob/eb66b2e9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
index b4841ac..278e4e8 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java
@@ -41,37 +41,38 @@ import java.util.logging.Logger;
 /**
  * Helper code that wraps the YARN Client API for our purposes.
  */
-public final class YarnSubmissionHelper implements Closeable{
+public final class YarnSubmissionHelper implements Closeable {
+
   private static final Logger LOG = Logger.getLogger(YarnSubmissionHelper.class.getName());
 
   private final YarnClient yarnClient;
-  private final YarnClientApplication yarnClientApplication;
   private final GetNewApplicationResponse applicationResponse;
   private final ApplicationSubmissionContext applicationSubmissionContext;
   private final ApplicationId applicationId;
   private final Map<String, LocalResource> resources = new HashMap<>();
-  private final REEFFileNames fileNames;
   private final ClasspathProvider classpath;
   private final SecurityTokenProvider tokenProvider;
   private final List<String> commandPrefixList;
+
   private String driverStdoutFilePath;
   private String driverStderrFilePath;
-  private Class launcherClazz;
+  private Class launcherClazz = REEFLauncher.class;
   private List<String> configurationFilePaths;
 
   public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
                               final REEFFileNames fileNames,
                               final ClasspathProvider classpath,
                               final SecurityTokenProvider tokenProvider,
+                              final boolean isUnmanaged,
                               final List<String> commandPrefixList) throws IOException, YarnException {
-    this.fileNames = fileNames;
+
     this.classpath = classpath;
 
     this.driverStdoutFilePath =
-        ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStdoutFileName();
+        ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStdoutFileName();
 
     this.driverStderrFilePath =
-        ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.fileNames.getDriverStderrFileName();
+        ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + fileNames.getDriverStderrFileName();
 
     LOG.log(Level.FINE, "Initializing YARN Client");
     this.yarnClient = YarnClient.createYarnClient();
@@ -80,22 +81,23 @@ public final class YarnSubmissionHelper implements Closeable{
     LOG.log(Level.FINE, "Initialized YARN Client");
 
     LOG.log(Level.FINE, "Requesting Application ID from YARN.");
-    this.yarnClientApplication = this.yarnClient.createApplication();
-    this.applicationResponse = this.yarnClientApplication.getNewApplicationResponse();
-    this.applicationSubmissionContext = this.yarnClientApplication.getApplicationSubmissionContext();
+    final YarnClientApplication yarnClientApplication = this.yarnClient.createApplication();
+    this.applicationResponse = yarnClientApplication.getNewApplicationResponse();
+    this.applicationSubmissionContext = yarnClientApplication.getApplicationSubmissionContext();
+    this.applicationSubmissionContext.setUnmanagedAM(isUnmanaged);
     this.applicationId = this.applicationSubmissionContext.getApplicationId();
     this.tokenProvider = tokenProvider;
     this.commandPrefixList = commandPrefixList;
-    this.launcherClazz = REEFLauncher.class;
-    this.configurationFilePaths = Collections.singletonList(this.fileNames.getDriverConfigurationPath());
+    this.configurationFilePaths = Collections.singletonList(fileNames.getDriverConfigurationPath());
     LOG.log(Level.INFO, "YARN Application ID: {0}", this.applicationId);
   }
 
   public YarnSubmissionHelper(final YarnConfiguration yarnConfiguration,
                               final REEFFileNames fileNames,
                               final ClasspathProvider classpath,
-                              final SecurityTokenProvider tokenProvider) throws IOException, YarnException {
-    this(yarnConfiguration, fileNames, classpath, tokenProvider, null);
+                              final SecurityTokenProvider tokenProvider,
+                              final boolean isUnmanaged) throws IOException, YarnException {
+    this(yarnConfiguration, fileNames, classpath, tokenProvider, isUnmanaged, null);
   }
 
   /**