You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2017/01/18 21:47:50 UTC
hive git commit: Revert "HIVE-13696: Monitor fair-scheduler.xml and
automatically update/validate jobs submitted to fair-scheduler (Reuben
Kuhnert, reviewed by Yongzhi Chen and Mohit Sabharwal)"
Repository: hive
Updated Branches:
refs/heads/master 65a65826a -> b5f5f68a5
Revert "HIVE-13696: Monitor fair-scheduler.xml and automatically update/validate jobs submitted to fair-scheduler (Reuben Kuhnert, reviewed by Yongzhi Chen and Mohit Sabharwal)"
This reverts commit 323c835e52ded7fe7a195e20d722a08edd247b83.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b5f5f68a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b5f5f68a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b5f5f68a
Branch: refs/heads/master
Commit: b5f5f68a5d14af8cde53fda7c4991747988a8587
Parents: 65a6582
Author: Sergio Pena <se...@cloudera.com>
Authored: Wed Jan 18 15:43:34 2017 -0600
Committer: Sergio Pena <se...@cloudera.com>
Committed: Wed Jan 18 15:43:34 2017 -0600
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Driver.java | 11 --
.../hive/ql/session/YarnFairScheduling.java | 70 -----------
.../service/cli/session/HiveSessionImpl.java | 8 +-
.../apache/hadoop/hive/shims/SchedulerShim.java | 2 -
shims/scheduler/pom.xml | 7 +-
.../schshim/FairSchedulerQueueAllocator.java | 93 --------------
.../hadoop/hive/schshim/FairSchedulerShim.java | 96 +++++----------
.../hadoop/hive/schshim/QueueAllocator.java | 37 ------
.../TestFairSchedulerQueueAllocator.java | 120 -------------------
9 files changed, 35 insertions(+), 409 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b5f5f68a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index a367f27..fd6020b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -116,7 +116,6 @@ import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.session.YarnFairScheduling;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
import org.apache.hadoop.hive.shims.Utils;
@@ -540,8 +539,6 @@ public class Driver implements CommandProcessor {
plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());
}
- configureScheduling(conf, userName);
-
//do the authorization check
if (!sem.skipAuthorization() &&
HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
@@ -666,14 +663,6 @@ public class Driver implements CommandProcessor {
}
}
- private HiveConf configureScheduling(HiveConf configuration, String forUser) throws IOException, HiveException {
- if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(configuration)) {
- YarnFairScheduling.validateYarnQueue(configuration, forUser);
- }
-
- return configuration;
- }
-
private ImmutableMap<String, Long> dumpMetaCallTimingWithoutEx(String phase) {
try {
return Hive.get().dumpAndClearMetaCallTiming(phase);
http://git-wip-us.apache.org/repos/asf/hive/blob/b5f5f68a/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java
deleted file mode 100644
index e3ba47c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.session;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.shims.ShimLoader;
-
-import java.io.IOException;
-
-/**
- * A front handle for managing job submission to Yarn-FairScheduler.
- */
-public class YarnFairScheduling {
- /**
- * Determine if jobs can be configured for YARN fair scheduling.
- * @param conf - the current HiveConf configuration.
- * @return Returns true when impersonation mode is disabled and fair-scheduling is enabled.
- */
- public static boolean usingNonImpersonationModeWithFairScheduling(HiveConf conf) {
- return (conf != null)
- && (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)
- && (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)));
- }
-
- /**
- * Configure the default YARN queue for the user.
- * @param conf - The current HiveConf configuration.
- * @param forUser - The user to configure scheduling for.
- * @throws IOException
- * @throws HiveException
- */
- public static void setDefaultJobQueue(HiveConf conf, String forUser) throws IOException, HiveException {
- Preconditions.checkState(usingNonImpersonationModeWithFairScheduling(conf),
- "Unable to map job to fair-scheduler because either impersonation is on or fair-scheduling is disabled.");
-
- ShimLoader.getSchedulerShims().refreshDefaultQueue(conf, forUser);
- }
-
- /**
- * Validate the current YARN queue for the current user.
- * @param conf - The current HiveConf configuration.
- * @param forUser - The user to configure scheduling for.
- * @throws IOException
- * @throws HiveException
- */
- public static void validateYarnQueue(HiveConf conf, String forUser) throws IOException, HiveException {
- Preconditions.checkState(usingNonImpersonationModeWithFairScheduling(conf),
- "Unable to map job to fair-scheduler because either impersonation is on or fair-scheduling is disabled.");
-
- ShimLoader.getSchedulerShims().validateQueueConfiguration(conf, forUser);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b5f5f68a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 460a26b..f939a93 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.SetProcessor;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.session.YarnFairScheduling;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -131,10 +130,11 @@ public class HiveSessionImpl implements HiveSession {
try {
// In non-impersonation mode, map scheduler queue to current user
// if fair scheduler is configured.
- if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(sessionConf)) {
- YarnFairScheduling.setDefaultJobQueue(sessionConf, username);
+ if (! sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
+ sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) {
+ ShimLoader.getHadoopShims().refreshDefaultQueue(sessionConf, username);
}
- } catch (Exception e) {
+ } catch (IOException e) {
LOG.warn("Error setting scheduler queue: " + e, e);
}
// Set an explicit session name to control the download directory name
http://git-wip-us.apache.org/repos/asf/hive/blob/b5f5f68a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
index f88e192..63803b8 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
@@ -34,6 +34,4 @@ public interface SchedulerShim {
*/
public void refreshDefaultQueue(Configuration conf, String userName)
throws IOException;
-
- public void validateQueueConfiguration(Configuration configuration, String forUser) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b5f5f68a/shims/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/shims/scheduler/pom.xml b/shims/scheduler/pom.xml
index 4242973..9141c1e 100644
--- a/shims/scheduler/pom.xml
+++ b/shims/scheduler/pom.xml
@@ -77,12 +77,7 @@
<version>${hadoop.version}</version>
<optional>true</optional>
</dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <version>${mockito-all.version}</version>
- </dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<version>${hadoop.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/b5f5f68a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java
deleted file mode 100644
index 0e32ff0..0000000
--- a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.schshim;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class FairSchedulerQueueAllocator implements QueueAllocator {
- private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerQueueAllocator.class);
- private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file";
-
- private String currentlyWatching;
- private AllocationFileLoaderService loaderService;
- private final AtomicReference<AllocationConfiguration> allocationConfiguration
- = new AtomicReference<AllocationConfiguration>();
-
- /**
- * Generates a Yarn FairScheduler queue resolver based on 'fair-scheduler.xml'.
- * @param config The HiveConf configuration.
- * @param username The user to configure the job for.
- * @return Returns a configured allocation resolver.
- * @throws IOException
- */
- public synchronized AtomicReference<AllocationConfiguration> makeConfigurationFor(Configuration config, String username) throws IOException {
- updateWatcher(config);
-
- return allocationConfiguration;
- }
-
- public synchronized void refresh(Configuration config) {
- updateWatcher(config);
- }
-
- @VisibleForTesting
- public String getCurrentlyWatchingFile() {
- return this.currentlyWatching;
- }
-
- private void updateWatcher(Configuration config) {
- if (this.loaderService != null && StringUtils.equals(currentlyWatching, config.get(YARN_SCHEDULER_FILE_PROPERTY))) return;
-
- this.currentlyWatching = config.get(YARN_SCHEDULER_FILE_PROPERTY);
-
- if (this.loaderService != null) {
- this.loaderService.stop();
- }
-
- this.loaderService = new AllocationFileLoaderService();
- this.loaderService.init(config);
- this.loaderService.setReloadListener(new AllocationFileLoaderService.Listener() {
- @Override
- public void onReload(AllocationConfiguration allocs) {
- allocationConfiguration.set(allocs);
- }
- });
-
- try {
- this.loaderService.reloadAllocations();
- } catch (Exception ex) {
- LOG.error("Failed to load queue allocations", ex);
- }
-
- if (allocationConfiguration.get() == null) {
- allocationConfiguration.set(new AllocationConfiguration(config));
- }
-
- this.loaderService.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/b5f5f68a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
index 4856c10..372244d 100644
--- a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
+++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
@@ -17,90 +17,54 @@
*/
package org.apache.hadoop.hive.schshim;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.shims.SchedulerShim;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
-import java.io.IOException;
-
-/*
- * FairSchedulerShim monitors changes in fair-scheduler.xml (if it exists) to allow for dynamic
- * reloading and queue resolution. When changes to the fair-scheduler.xml file are detected, the
- * cached queue resolution policies for each user are cleared, and then re-cached/validated on job-submit.
- */
-
public class FairSchedulerShim implements SchedulerShim {
private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerShim.class);
private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
- private final QueueAllocator queueAllocator;
-
- @VisibleForTesting
- public FairSchedulerShim(QueueAllocator queueAllocator) {
- this.queueAllocator = queueAllocator;
- }
-
- public FairSchedulerShim() {
- this(new FairSchedulerQueueAllocator());
- }
-
- /**
- * Applies the default YARN fair scheduler queue for a user.
- * @param conf - the current HiveConf configuration.
- * @param forUser - the user to configure the default queue for.
- * @throws IOException
- */
@Override
- public synchronized void refreshDefaultQueue(Configuration conf, String forUser)
- throws IOException {
- setJobQueueForUserInternal(conf, YarnConfiguration.DEFAULT_QUEUE_NAME, forUser);
- }
+ public void refreshDefaultQueue(Configuration conf, String userName)
+ throws IOException {
+ String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME;
+ final AtomicReference<AllocationConfiguration> allocConf = new AtomicReference<AllocationConfiguration>();
- /**
- * Validates the YARN fair scheduler queue configuration.
- * @param conf - the current HiveConf configuration.
- * @param forUser - the user to configure the default queue for.
- * @throws IOException
- */
- @Override
- public synchronized void validateQueueConfiguration(Configuration conf, String forUser) throws IOException {
- // Currently, "validation" is just to ensure that the client can still set the same queue that they
- // could previously. In almost all situations, this should be essentially a no-op (unless the fair-scheduler.xml
- // file changes in such a way as this is disallowed). Currently this implementation is just inteded to allow us
- // to validate that the user's configuration is at least reasonable on a per-request basis beyond from the already-
- // occurring per session setup.
-
- // TODO: Build out ACL enforcement.
-
- String currentJobQueue = conf.get(MR2_JOB_QUEUE_PROPERTY);
- if (currentJobQueue != null && !currentJobQueue.isEmpty()) {
- setJobQueueForUserInternal(conf, currentJobQueue, forUser);
- } else {
- refreshDefaultQueue(conf, forUser);
+ AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService();
+ allocsLoader.init(conf);
+ allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() {
+ @Override
+ public void onReload(AllocationConfiguration allocs) {
+ allocConf.set(allocs);
+ }
+ });
+ try {
+ allocsLoader.reloadAllocations();
+ } catch (Exception ex) {
+ throw new IOException("Failed to load queue allocations", ex);
}
- }
-
- public QueueAllocator getQueueAllocator() {
- return this.queueAllocator;
- }
-
- private void setJobQueueForUserInternal(Configuration conf, String queueName, String forUser) throws IOException {
- QueuePlacementPolicy queuePolicy = queueAllocator.makeConfigurationFor(conf, forUser).get().getPlacementPolicy();
-
+ if (allocConf.get() == null) {
+ allocConf.set(new AllocationConfiguration(conf));
+ }
+ QueuePlacementPolicy queuePolicy = allocConf.get().getPlacementPolicy();
if (queuePolicy != null) {
- String requestedQueue = queuePolicy.assignAppToQueue(queueName, forUser);
+ requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName);
if (StringUtils.isNotBlank(requestedQueue)) {
- LOG.info("Setting queue name to: '{}' for user '{}'", requestedQueue, forUser);
+ LOG.debug("Setting queue name to " + requestedQueue + " for user "
+ + userName);
conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue);
- } else {
- LOG.warn("Unable to set queue: {} for user: {}, resetting to user's default queue.", requestedQueue, forUser);
- conf.set(MR2_JOB_QUEUE_PROPERTY, queuePolicy.assignAppToQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, forUser));
}
}
}
-}
\ No newline at end of file
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b5f5f68a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java
deleted file mode 100644
index daf02da..0000000
--- a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.schshim;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-public interface QueueAllocator {
- /**
- * Generates a queue resolver for a given configuration and username.
- * @param configuration The HiveConf configuration.
- * @param username The user to configure the job for.
- * @return Returns the queue allocation configuration.
- * @throws IOException
- */
- AtomicReference<AllocationConfiguration> makeConfigurationFor(Configuration configuration, String username) throws IOException;
- void refresh(Configuration configuration);
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/b5f5f68a/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java b/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java
deleted file mode 100644
index 58465e4..0000000
--- a/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.schshim;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestFairSchedulerQueueAllocator {
- private static final String EMPTY = "";
- private static final int USERNAME_ARGUMENT_INDEX = 1;
- private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file";
- private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
-
- @Test
- public void testChangingLastUsedHiveConfigurationStringDirectly() throws Exception {
- Configuration configuration = new Configuration();
- FairSchedulerShim shim = new FairSchedulerShim();
- FairSchedulerQueueAllocator allocator = (FairSchedulerQueueAllocator) shim.getQueueAllocator();
-
- // On initialization should be uncached.
- assertNull(allocator.getCurrentlyWatchingFile());
-
- // Per job submission the location of fair-scheduler should be updated.
- for (String location : new String[] { "/first", "/second", "third/fourth" }){
- for (String user : new String[] { "firstUser", "secondUser", "thirdUser" }) {
- configuration.set(YARN_SCHEDULER_FILE_PROPERTY, location);
- shim.refreshDefaultQueue(configuration, user);
- assertEquals(allocator.getCurrentlyWatchingFile(), location);
- }
- }
- }
-
- @Test
- public void testNeverBeforeSeenUsersEffectOnLastUsedHiveConfigurationString() throws Exception {
- final Configuration configuration = new Configuration();
- FairSchedulerShim shim = new FairSchedulerShim();
- FairSchedulerQueueAllocator allocator = (FairSchedulerQueueAllocator) shim.getQueueAllocator();
-
- // Per job submission the location of fair-scheduler should be updated.
- configuration.set(YARN_SCHEDULER_FILE_PROPERTY, "/some/unchanging/location");
- for (String user : new String[] { "first", "second", "third", "fourth", "fifth" }) {
- shim.refreshDefaultQueue(configuration, user);
- assertEquals(allocator.getCurrentlyWatchingFile(), "/some/unchanging/location");
- }
- }
-
- @Test
- public void testQueueAllocation() throws Exception {
- Configuration configuration = new Configuration();
- QueueAllocator allocator = mock(QueueAllocator.class);
-
- when(allocator.makeConfigurationFor(any(Configuration.class), any(String.class)))
- .thenAnswer(new Answer<AtomicReference<AllocationConfiguration>>() {
- @Override
- public AtomicReference<AllocationConfiguration> answer(InvocationOnMock invocationOnMock) throws Throwable {
- // Capture which user is causing the reset for verification purposes.
- final String username = (String) invocationOnMock.getArguments()[USERNAME_ARGUMENT_INDEX];
-
- AllocationConfiguration allocationConfiguration = mock(AllocationConfiguration.class);
- when(allocationConfiguration.getPlacementPolicy())
- .thenAnswer(new Answer<QueuePlacementPolicy>() {
- @Override
- public QueuePlacementPolicy answer(InvocationOnMock invocationOnMock) throws Throwable {
- QueuePlacementPolicy placementPolicy = mock(QueuePlacementPolicy.class);
- when(placementPolicy.assignAppToQueue(any(String.class), any(String.class)))
- .thenAnswer(new Answer<String>() {
- @Override
- public String answer(InvocationOnMock invocationOnMock) throws Throwable {
- return String.format("queue.for.%s", username);
- }
- });
-
- return placementPolicy;
- }
- });
-
- return new AtomicReference<>(allocationConfiguration);
- }
- });
-
- FairSchedulerShim shim = new FairSchedulerShim(allocator);
-
- // Per job submission the location of fair-scheduler should be updated.
- configuration.set(YARN_SCHEDULER_FILE_PROPERTY, "/some/file/location");
- for (String user : new String[] { "first", "second", "third", "fourth", "fifth" }) {
- shim.refreshDefaultQueue(configuration, user);
-
- String queueName = String.format("queue.for.%s", user);
- assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), queueName);
- }
- }
-}
\ No newline at end of file