You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2017/01/11 23:12:38 UTC
hive git commit: HIVE-13696: Monitor fair-scheduler.xml and
automatically update/validate jobs submitted to fair-scheduler (Reuben
Kuhnert, reviewed by Yongzhi Chen and Mohit Sabharwal)
Repository: hive
Updated Branches:
refs/heads/master 58247c563 -> 323c835e5
HIVE-13696: Monitor fair-scheduler.xml and automatically update/validate jobs submitted to fair-scheduler (Reuben Kuhnert, reviewed by Yongzhi Chen and Mohit Sabharwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/323c835e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/323c835e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/323c835e
Branch: refs/heads/master
Commit: 323c835e52ded7fe7a195e20d722a08edd247b83
Parents: 58247c5
Author: Sergio Pena <se...@cloudera.com>
Authored: Wed Jan 11 17:11:34 2017 -0600
Committer: Sergio Pena <se...@cloudera.com>
Committed: Wed Jan 11 17:11:34 2017 -0600
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Driver.java | 11 ++
.../hive/ql/session/YarnFairScheduling.java | 70 +++++++++++
.../service/cli/session/HiveSessionImpl.java | 8 +-
.../apache/hadoop/hive/shims/SchedulerShim.java | 2 +
shims/scheduler/pom.xml | 7 +-
.../schshim/FairSchedulerQueueAllocator.java | 93 ++++++++++++++
.../hadoop/hive/schshim/FairSchedulerShim.java | 96 ++++++++++-----
.../hadoop/hive/schshim/QueueAllocator.java | 37 ++++++
.../TestFairSchedulerQueueAllocator.java | 120 +++++++++++++++++++
9 files changed, 409 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index fd6020b..a367f27 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.session.YarnFairScheduling;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
import org.apache.hadoop.hive.shims.Utils;
@@ -539,6 +540,8 @@ public class Driver implements CommandProcessor {
plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());
}
+ configureScheduling(conf, userName);
+
//do the authorization check
if (!sem.skipAuthorization() &&
HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
@@ -663,6 +666,14 @@ public class Driver implements CommandProcessor {
}
}
+ private HiveConf configureScheduling(HiveConf configuration, String forUser) throws IOException, HiveException {
+ if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(configuration)) {
+ YarnFairScheduling.validateYarnQueue(configuration, forUser);
+ }
+
+ return configuration;
+ }
+
private ImmutableMap<String, Long> dumpMetaCallTimingWithoutEx(String phase) {
try {
return Hive.get().dumpAndClearMetaCallTiming(phase);
http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java
new file mode 100644
index 0000000..e3ba47c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
+import java.io.IOException;
+
+/**
+ * A front handle for managing job submission to Yarn-FairScheduler.
+ */
+public class YarnFairScheduling {
+ /**
+ * Determine if jobs can be configured for YARN fair scheduling.
+ * @param conf - the current HiveConf configuration.
+ * @return Returns true when impersonation mode is disabled and fair-scheduling is enabled.
+ */
+ public static boolean usingNonImpersonationModeWithFairScheduling(HiveConf conf) {
+ return (conf != null)
+ && (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)
+ && (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)));
+ }
+
+ /**
+ * Configure the default YARN queue for the user.
+ * @param conf - The current HiveConf configuration.
+ * @param forUser - The user to configure scheduling for.
+ * @throws IOException
+ * @throws HiveException
+ */
+ public static void setDefaultJobQueue(HiveConf conf, String forUser) throws IOException, HiveException {
+ Preconditions.checkState(usingNonImpersonationModeWithFairScheduling(conf),
+ "Unable to map job to fair-scheduler because either impersonation is on or fair-scheduling is disabled.");
+
+ ShimLoader.getSchedulerShims().refreshDefaultQueue(conf, forUser);
+ }
+
+ /**
+ * Validate the current YARN queue for the current user.
+ * @param conf - The current HiveConf configuration.
+ * @param forUser - The user to configure scheduling for.
+ * @throws IOException
+ * @throws HiveException
+ */
+ public static void validateYarnQueue(HiveConf conf, String forUser) throws IOException, HiveException {
+ Preconditions.checkState(usingNonImpersonationModeWithFairScheduling(conf),
+ "Unable to map job to fair-scheduler because either impersonation is on or fair-scheduling is disabled.");
+
+ ShimLoader.getSchedulerShims().validateQueueConfiguration(conf, forUser);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index f939a93..460a26b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.SetProcessor;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.YarnFairScheduling;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -130,11 +131,10 @@ public class HiveSessionImpl implements HiveSession {
try {
// In non-impersonation mode, map scheduler queue to current user
// if fair scheduler is configured.
- if (! sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
- sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) {
- ShimLoader.getHadoopShims().refreshDefaultQueue(sessionConf, username);
+ if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(sessionConf)) {
+ YarnFairScheduling.setDefaultJobQueue(sessionConf, username);
}
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.warn("Error setting scheduler queue: " + e, e);
}
// Set an explicit session name to control the download directory name
http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
index 63803b8..f88e192 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
@@ -34,4 +34,6 @@ public interface SchedulerShim {
*/
public void refreshDefaultQueue(Configuration conf, String userName)
throws IOException;
+
+ public void validateQueueConfiguration(Configuration configuration, String forUser) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/shims/scheduler/pom.xml b/shims/scheduler/pom.xml
index 9141c1e..4242973 100644
--- a/shims/scheduler/pom.xml
+++ b/shims/scheduler/pom.xml
@@ -77,7 +77,12 @@
<version>${hadoop.version}</version>
<optional>true</optional>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${mockito-all.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<version>${hadoop.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java
new file mode 100644
index 0000000..0e32ff0
--- /dev/null
+++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.schshim;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FairSchedulerQueueAllocator implements QueueAllocator {
+ private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerQueueAllocator.class);
+ private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file";
+
+ private String currentlyWatching;
+ private AllocationFileLoaderService loaderService;
+ private final AtomicReference<AllocationConfiguration> allocationConfiguration
+ = new AtomicReference<AllocationConfiguration>();
+
+ /**
+ * Generates a Yarn FairScheduler queue resolver based on 'fair-scheduler.xml'.
+ * @param config The HiveConf configuration.
+ * @param username The user to configure the job for.
+ * @return Returns a configured allocation resolver.
+ * @throws IOException
+ */
+ public synchronized AtomicReference<AllocationConfiguration> makeConfigurationFor(Configuration config, String username) throws IOException {
+ updateWatcher(config);
+
+ return allocationConfiguration;
+ }
+
+ public synchronized void refresh(Configuration config) {
+ updateWatcher(config);
+ }
+
+ @VisibleForTesting
+ public String getCurrentlyWatchingFile() {
+ return this.currentlyWatching;
+ }
+
+ private void updateWatcher(Configuration config) {
+ if (this.loaderService != null && StringUtils.equals(currentlyWatching, config.get(YARN_SCHEDULER_FILE_PROPERTY))) return;
+
+ this.currentlyWatching = config.get(YARN_SCHEDULER_FILE_PROPERTY);
+
+ if (this.loaderService != null) {
+ this.loaderService.stop();
+ }
+
+ this.loaderService = new AllocationFileLoaderService();
+ this.loaderService.init(config);
+ this.loaderService.setReloadListener(new AllocationFileLoaderService.Listener() {
+ @Override
+ public void onReload(AllocationConfiguration allocs) {
+ allocationConfiguration.set(allocs);
+ }
+ });
+
+ try {
+ this.loaderService.reloadAllocations();
+ } catch (Exception ex) {
+ LOG.error("Failed to load queue allocations", ex);
+ }
+
+ if (allocationConfiguration.get() == null) {
+ allocationConfiguration.set(new AllocationConfiguration(config));
+ }
+
+ this.loaderService.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
index 372244d..4856c10 100644
--- a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
+++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
@@ -17,54 +17,90 @@
*/
package org.apache.hadoop.hive.schshim;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.shims.SchedulerShim;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
+import java.io.IOException;
+
+/*
+ * FairSchedulerShim monitors changes in fair-scheduler.xml (if it exists) to allow for dynamic
+ * reloading and queue resolution. When changes to the fair-scheduler.xml file are detected, the
+ * cached queue resolution policies for each user are cleared, and then re-cached/validated on job-submit.
+ */
+
public class FairSchedulerShim implements SchedulerShim {
private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerShim.class);
private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
+ private final QueueAllocator queueAllocator;
+
+ @VisibleForTesting
+ public FairSchedulerShim(QueueAllocator queueAllocator) {
+ this.queueAllocator = queueAllocator;
+ }
+
+ public FairSchedulerShim() {
+ this(new FairSchedulerQueueAllocator());
+ }
+
+ /**
+ * Applies the default YARN fair scheduler queue for a user.
+ * @param conf - the current HiveConf configuration.
+ * @param forUser - the user to configure the default queue for.
+ * @throws IOException
+ */
@Override
- public void refreshDefaultQueue(Configuration conf, String userName)
- throws IOException {
- String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME;
- final AtomicReference<AllocationConfiguration> allocConf = new AtomicReference<AllocationConfiguration>();
+ public synchronized void refreshDefaultQueue(Configuration conf, String forUser)
+ throws IOException {
+ setJobQueueForUserInternal(conf, YarnConfiguration.DEFAULT_QUEUE_NAME, forUser);
+ }
- AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService();
- allocsLoader.init(conf);
- allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() {
- @Override
- public void onReload(AllocationConfiguration allocs) {
- allocConf.set(allocs);
- }
- });
- try {
- allocsLoader.reloadAllocations();
- } catch (Exception ex) {
- throw new IOException("Failed to load queue allocations", ex);
- }
- if (allocConf.get() == null) {
- allocConf.set(new AllocationConfiguration(conf));
+ /**
+ * Validates the YARN fair scheduler queue configuration.
+ * @param conf - the current HiveConf configuration.
+ * @param forUser - the user to configure the default queue for.
+ * @throws IOException
+ */
+ @Override
+ public synchronized void validateQueueConfiguration(Configuration conf, String forUser) throws IOException {
+ // Currently, "validation" is just to ensure that the client can still set the same queue that they
+ // could previously. In almost all situations, this should be essentially a no-op (unless the fair-scheduler.xml
+ // file changes in such a way as this is disallowed). Currently this implementation is just inteded to allow us
+ // to validate that the user's configuration is at least reasonable on a per-request basis beyond from the already-
+ // occurring per session setup.
+
+ // TODO: Build out ACL enforcement.
+
+ String currentJobQueue = conf.get(MR2_JOB_QUEUE_PROPERTY);
+ if (currentJobQueue != null && !currentJobQueue.isEmpty()) {
+ setJobQueueForUserInternal(conf, currentJobQueue, forUser);
+ } else {
+ refreshDefaultQueue(conf, forUser);
}
- QueuePlacementPolicy queuePolicy = allocConf.get().getPlacementPolicy();
+ }
+
+ public QueueAllocator getQueueAllocator() {
+ return this.queueAllocator;
+ }
+
+ private void setJobQueueForUserInternal(Configuration conf, String queueName, String forUser) throws IOException {
+ QueuePlacementPolicy queuePolicy = queueAllocator.makeConfigurationFor(conf, forUser).get().getPlacementPolicy();
+
if (queuePolicy != null) {
- requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName);
+ String requestedQueue = queuePolicy.assignAppToQueue(queueName, forUser);
if (StringUtils.isNotBlank(requestedQueue)) {
- LOG.debug("Setting queue name to " + requestedQueue + " for user "
- + userName);
+ LOG.info("Setting queue name to: '{}' for user '{}'", requestedQueue, forUser);
conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue);
+ } else {
+ LOG.warn("Unable to set queue: {} for user: {}, resetting to user's default queue.", requestedQueue, forUser);
+ conf.set(MR2_JOB_QUEUE_PROPERTY, queuePolicy.assignAppToQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, forUser));
}
}
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java
new file mode 100644
index 0000000..daf02da
--- /dev/null
+++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.schshim;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public interface QueueAllocator {
+ /**
+ * Generates a queue resolver for a given configuration and username.
+ * @param configuration The HiveConf configuration.
+ * @param username The user to configure the job for.
+ * @return Returns the queue allocation configuration.
+ * @throws IOException
+ */
+ AtomicReference<AllocationConfiguration> makeConfigurationFor(Configuration configuration, String username) throws IOException;
+ void refresh(Configuration configuration);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java b/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java
new file mode 100644
index 0000000..58465e4
--- /dev/null
+++ b/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.schshim;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestFairSchedulerQueueAllocator {
+ private static final String EMPTY = "";
+ private static final int USERNAME_ARGUMENT_INDEX = 1;
+ private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file";
+ private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
+
+ @Test
+ public void testChangingLastUsedHiveConfigurationStringDirectly() throws Exception {
+ Configuration configuration = new Configuration();
+ FairSchedulerShim shim = new FairSchedulerShim();
+ FairSchedulerQueueAllocator allocator = (FairSchedulerQueueAllocator) shim.getQueueAllocator();
+
+ // On initialization should be uncached.
+ assertNull(allocator.getCurrentlyWatchingFile());
+
+ // Per job submission the location of fair-scheduler should be updated.
+ for (String location : new String[] { "/first", "/second", "third/fourth" }){
+ for (String user : new String[] { "firstUser", "secondUser", "thirdUser" }) {
+ configuration.set(YARN_SCHEDULER_FILE_PROPERTY, location);
+ shim.refreshDefaultQueue(configuration, user);
+ assertEquals(allocator.getCurrentlyWatchingFile(), location);
+ }
+ }
+ }
+
+ @Test
+ public void testNeverBeforeSeenUsersEffectOnLastUsedHiveConfigurationString() throws Exception {
+ final Configuration configuration = new Configuration();
+ FairSchedulerShim shim = new FairSchedulerShim();
+ FairSchedulerQueueAllocator allocator = (FairSchedulerQueueAllocator) shim.getQueueAllocator();
+
+ // Per job submission the location of fair-scheduler should be updated.
+ configuration.set(YARN_SCHEDULER_FILE_PROPERTY, "/some/unchanging/location");
+ for (String user : new String[] { "first", "second", "third", "fourth", "fifth" }) {
+ shim.refreshDefaultQueue(configuration, user);
+ assertEquals(allocator.getCurrentlyWatchingFile(), "/some/unchanging/location");
+ }
+ }
+
+ @Test
+ public void testQueueAllocation() throws Exception {
+ Configuration configuration = new Configuration();
+ QueueAllocator allocator = mock(QueueAllocator.class);
+
+ when(allocator.makeConfigurationFor(any(Configuration.class), any(String.class)))
+ .thenAnswer(new Answer<AtomicReference<AllocationConfiguration>>() {
+ @Override
+ public AtomicReference<AllocationConfiguration> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ // Capture which user is causing the reset for verification purposes.
+ final String username = (String) invocationOnMock.getArguments()[USERNAME_ARGUMENT_INDEX];
+
+ AllocationConfiguration allocationConfiguration = mock(AllocationConfiguration.class);
+ when(allocationConfiguration.getPlacementPolicy())
+ .thenAnswer(new Answer<QueuePlacementPolicy>() {
+ @Override
+ public QueuePlacementPolicy answer(InvocationOnMock invocationOnMock) throws Throwable {
+ QueuePlacementPolicy placementPolicy = mock(QueuePlacementPolicy.class);
+ when(placementPolicy.assignAppToQueue(any(String.class), any(String.class)))
+ .thenAnswer(new Answer<String>() {
+ @Override
+ public String answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return String.format("queue.for.%s", username);
+ }
+ });
+
+ return placementPolicy;
+ }
+ });
+
+ return new AtomicReference<>(allocationConfiguration);
+ }
+ });
+
+ FairSchedulerShim shim = new FairSchedulerShim(allocator);
+
+ // Per job submission the location of fair-scheduler should be updated.
+ configuration.set(YARN_SCHEDULER_FILE_PROPERTY, "/some/file/location");
+ for (String user : new String[] { "first", "second", "third", "fourth", "fifth" }) {
+ shim.refreshDefaultQueue(configuration, user);
+
+ String queueName = String.format("queue.for.%s", user);
+ assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), queueName);
+ }
+ }
+}
\ No newline at end of file