You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/07/14 09:12:14 UTC

incubator-reef git commit: [REEF-454] Allow for per-job queues

Repository: incubator-reef
Updated Branches:
  refs/heads/master f632869a5 -> ac1555b19


[REEF-454] Allow for per-job queues

This adds `YarnDriverConfiguration` that can be filled out by an application and
merged into their DriverConfiguration to set the queue to submit the job to.

This also makes sure the `YarnClientConfiguration.YARN_QUEUE_NAME` is actually
used (it wasn't). It now provides the default for when the new per-job setting
isn't used.

Lastly, this change deprecates `JobSubmissionEvent.getQueue()` which was never
used in the first place. Its purpose is now served by the mechanism introduced
here.

JIRA:
  [REEF-454](https://issues.apache.org/jira/browse/REEF-454)

Pull Request:
  This closes #279

Author:    Markus Weimer <we...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ac1555b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ac1555b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ac1555b1

Branch: refs/heads/master
Commit: ac1555b19c715f3ebb3cb6c59561b2df1ff22a4a
Parents: f632869
Author: Markus Weimer <we...@apache.org>
Authored: Mon Jul 6 17:44:59 2015 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Tue Jul 14 00:06:02 2015 -0700

----------------------------------------------------------------------
 .../common/client/api/JobSubmissionEvent.java   |  2 +
 .../client/api/JobSubmissionEventImpl.java      |  2 +
 .../yarn/client/YarnDriverConfiguration.java    | 46 ++++++++++++++++++++
 .../yarn/client/YarnJobSubmissionHandler.java   | 35 +++++++++------
 4 files changed, 72 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
index 76f9877..7bf3123 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
@@ -76,6 +76,8 @@ public interface JobSubmissionEvent {
 
   /**
    * @return Queue to submit the Job to
+   * @deprecated in 0.12. Use org.apache.reef.runtime.yarn.client.YarnDriverConfiguration#QUEUE instead.
    */
+  @Deprecated
   Optional<String> getQueue();
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
index d63f881..3a6e9dd 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
@@ -184,7 +184,9 @@ public final class JobSubmissionEventImpl implements JobSubmissionEvent {
 
     /**
      * @see JobSubmissionEvent#getQueue()
+     * @deprecated in 0.12. Use org.apache.reef.runtime.yarn.client.YarnDriverConfiguration#QUEUE instead.
      */
+    @Deprecated
     public Builder setQueue(final String queue) {
       this.queue = queue;
       return this;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java
new file mode 100644
index 0000000..9975f36
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+
+/**
+ * Additional YARN-Specific configuration options to be merged with DriverConfiguration.
+ */
+@Public
+@ClientSide
+public final class YarnDriverConfiguration extends ConfigurationModuleBuilder {
+
+  /**
+   * The queue to submit this Driver to.
+   */
+  public static final OptionalParameter<String> QUEUE = new OptionalParameter<>();
+
+  /**
+   * ConfigurationModule to set YARN-Specific configuration options to be merged with DriverConfiguration.
+   */
+  public static final ConfigurationModule CONF = new YarnDriverConfiguration()
+      .bindNamedParameter(JobQueue.class, QUEUE)
+      .build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index 0eb7246..a4e3412 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -31,6 +31,7 @@ import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.JobJarMaker;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
 import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
 import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
 import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
@@ -39,7 +40,6 @@ import org.apache.reef.tang.Configurations;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
 import org.apache.reef.util.Optional;
 
 import javax.inject.Inject;
@@ -48,8 +48,6 @@ import java.io.IOException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import static org.apache.reef.util.Optional.*;
-
 @Private
 @ClientSide
 final class YarnJobSubmissionHandler implements JobSubmissionHandler {
@@ -60,9 +58,9 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
   private final JobJarMaker jobJarMaker;
   private final REEFFileNames fileNames;
   private final ClasspathProvider classpath;
-  private final ConfigurationSerializer configurationSerializer;
   private final JobUploader uploader;
   private final double jvmSlack;
+  private final String defaultQueueName;
 
   @Inject
   YarnJobSubmissionHandler(
@@ -70,17 +68,17 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
       final JobJarMaker jobJarMaker,
       final REEFFileNames fileNames,
       final ClasspathProvider classpath,
-      final ConfigurationSerializer configurationSerializer,
       final JobUploader uploader,
-      @Parameter(JVMHeapSlack.class) final double jvmSlack) throws IOException {
+      @Parameter(JVMHeapSlack.class) final double jvmSlack,
+      @Parameter(JobQueue.class) String defaultQueueName) throws IOException {
 
     this.yarnConfiguration = yarnConfiguration;
     this.jobJarMaker = jobJarMaker;
     this.fileNames = fileNames;
     this.classpath = classpath;
-    this.configurationSerializer = configurationSerializer;
     this.uploader = uploader;
     this.jvmSlack = jvmSlack;
+    this.defaultQueueName = defaultQueueName;
   }
 
   @Override
@@ -110,7 +108,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
           .setApplicationName(jobSubmissionEvent.getIdentifier())
           .setDriverMemory(jobSubmissionEvent.getDriverMemory().get())
           .setPriority(getPriority(jobSubmissionEvent))
-          .setQueue(getQueue(jobSubmissionEvent, "default"))
+          .setQueue(getQueue(jobSubmissionEvent))
           .submit(jobSubmissionEvent.getRemoteId());
 
       LOG.log(Level.FINEST, "Submitted job with ID [{0}]", jobSubmissionEvent.getIdentifier());
@@ -141,12 +139,23 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
 
   /**
    * Extract the queue name from the jobSubmissionEvent or return default if none is set.
-   * <p/>
-   * TODO: Revisit this. We also have a named parameter for the queue in YarnClientConfiguration.
    */
-  private String getQueue(final JobSubmissionEvent jobSubmissionEvent,
-                          final String defaultQueue) {
-    return jobSubmissionEvent.getQueue().orElse(defaultQueue);
+  private String getQueue(final JobSubmissionEvent jobSubmissionEvent) {
+    return getQueue(jobSubmissionEvent.getConfiguration());
+  }
+
+  /**
+   * Extracts the queue name from the driverConfiguration or return default if none is set.
+   *
+   * @param driverConfiguration
+   * @return the queue name from the driverConfiguration or return default if none is set.
+   */
+  private String getQueue(final Configuration driverConfiguration) {
+    try {
+      return Tang.Factory.getTang().newInjector(driverConfiguration).getNamedInstance(JobQueue.class);
+    } catch (final Throwable t) {
+      return this.defaultQueueName;
+    }
   }
 
   private static Optional<String> getUserBoundJobSubmissionDirectory(final Configuration configuration) {