You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/07/14 09:12:14 UTC
incubator-reef git commit: [REEF-454] Allow for per-job queues
Repository: incubator-reef
Updated Branches:
refs/heads/master f632869a5 -> ac1555b19
[REEF-454] Allow for per-job queues
This adds `YarnDriverConfiguration` that can be filled out by an application and
merged into their DriverConfiguration to set the queue to submit the job to.
This also makes sure the `YarnClientConfiguration.YARN_QUEUE_NAME` is actually
used (it wasn't). It now provides the default for when the new per-job setting
isn't used.
Lastly, this change deprecates `JobSubmissionEvent.getQueue()` which was never
used in the first place. Its purpose is now served by the mechanism introduced
here.
JIRA:
[REEF-454](https://issues.apache.org/jira/browse/REEF-454)
Pull Request:
This closes #279
Author: Markus Weimer <we...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ac1555b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ac1555b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ac1555b1
Branch: refs/heads/master
Commit: ac1555b19c715f3ebb3cb6c59561b2df1ff22a4a
Parents: f632869
Author: Markus Weimer <we...@apache.org>
Authored: Mon Jul 6 17:44:59 2015 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Tue Jul 14 00:06:02 2015 -0700
----------------------------------------------------------------------
.../common/client/api/JobSubmissionEvent.java | 2 +
.../client/api/JobSubmissionEventImpl.java | 2 +
.../yarn/client/YarnDriverConfiguration.java | 46 ++++++++++++++++++++
.../yarn/client/YarnJobSubmissionHandler.java | 35 +++++++++------
4 files changed, 72 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
index 76f9877..7bf3123 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java
@@ -76,6 +76,8 @@ public interface JobSubmissionEvent {
/**
* @return Queue to submit the Job to
+ * @deprecated in 0.12. Use org.apache.reef.runtime.yarn.client.YarnDriverConfiguration#QUEUE instead.
*/
+ @Deprecated
Optional<String> getQueue();
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
index d63f881..3a6e9dd 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java
@@ -184,7 +184,9 @@ public final class JobSubmissionEventImpl implements JobSubmissionEvent {
/**
* @see JobSubmissionEvent#getQueue()
+ * @deprecated in 0.12. Use org.apache.reef.runtime.yarn.client.YarnDriverConfiguration#QUEUE instead.
*/
+ @Deprecated
public Builder setQueue(final String queue) {
this.queue = queue;
return this;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java
new file mode 100644
index 0000000..9975f36
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnDriverConfiguration.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.client;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+
+/**
+ * Additional YARN-Specific configuration options to be merged with DriverConfiguration.
+ */
+@Public
+@ClientSide
+public final class YarnDriverConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * The queue to submit this Driver to.
+ */
+ public static final OptionalParameter<String> QUEUE = new OptionalParameter<>();
+
+ /**
+ * ConfigurationModule to set YARN-Specific configuration options to be merged with DriverConfiguration.
+ */
+ public static final ConfigurationModule CONF = new YarnDriverConfiguration()
+ .bindNamedParameter(JobQueue.class, QUEUE)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ac1555b1/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
index 0eb7246..a4e3412 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java
@@ -31,6 +31,7 @@ import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.JobJarMaker;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.client.parameters.JobQueue;
import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
import org.apache.reef.runtime.yarn.client.uploader.JobUploader;
import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration;
@@ -39,7 +40,6 @@ import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.Optional;
import javax.inject.Inject;
@@ -48,8 +48,6 @@ import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
-import static org.apache.reef.util.Optional.*;
-
@Private
@ClientSide
final class YarnJobSubmissionHandler implements JobSubmissionHandler {
@@ -60,9 +58,9 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
private final JobJarMaker jobJarMaker;
private final REEFFileNames fileNames;
private final ClasspathProvider classpath;
- private final ConfigurationSerializer configurationSerializer;
private final JobUploader uploader;
private final double jvmSlack;
+ private final String defaultQueueName;
@Inject
YarnJobSubmissionHandler(
@@ -70,17 +68,17 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
final JobJarMaker jobJarMaker,
final REEFFileNames fileNames,
final ClasspathProvider classpath,
- final ConfigurationSerializer configurationSerializer,
final JobUploader uploader,
- @Parameter(JVMHeapSlack.class) final double jvmSlack) throws IOException {
+ @Parameter(JVMHeapSlack.class) final double jvmSlack,
+ @Parameter(JobQueue.class) String defaultQueueName) throws IOException {
this.yarnConfiguration = yarnConfiguration;
this.jobJarMaker = jobJarMaker;
this.fileNames = fileNames;
this.classpath = classpath;
- this.configurationSerializer = configurationSerializer;
this.uploader = uploader;
this.jvmSlack = jvmSlack;
+ this.defaultQueueName = defaultQueueName;
}
@Override
@@ -110,7 +108,7 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
.setApplicationName(jobSubmissionEvent.getIdentifier())
.setDriverMemory(jobSubmissionEvent.getDriverMemory().get())
.setPriority(getPriority(jobSubmissionEvent))
- .setQueue(getQueue(jobSubmissionEvent, "default"))
+ .setQueue(getQueue(jobSubmissionEvent))
.submit(jobSubmissionEvent.getRemoteId());
LOG.log(Level.FINEST, "Submitted job with ID [{0}]", jobSubmissionEvent.getIdentifier());
@@ -141,12 +139,23 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler {
/**
* Extract the queue name from the jobSubmissionEvent or return default if none is set.
- * <p/>
- * TODO: Revisit this. We also have a named parameter for the queue in YarnClientConfiguration.
*/
- private String getQueue(final JobSubmissionEvent jobSubmissionEvent,
- final String defaultQueue) {
- return jobSubmissionEvent.getQueue().orElse(defaultQueue);
+ private String getQueue(final JobSubmissionEvent jobSubmissionEvent) {
+ return getQueue(jobSubmissionEvent.getConfiguration());
+ }
+
+ /**
+ * Extracts the queue name from the driverConfiguration or return default if none is set.
+ *
+ * @param driverConfiguration
+ * @return the queue name from the driverConfiguration or return default if none is set.
+ */
+ private String getQueue(final Configuration driverConfiguration) {
+ try {
+ return Tang.Factory.getTang().newInjector(driverConfiguration).getNamedInstance(JobQueue.class);
+ } catch (final Throwable t) {
+ return this.defaultQueueName;
+ }
}
private static Optional<String> getUserBoundJobSubmissionDirectory(final Configuration configuration) {