You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2017/01/11 23:12:38 UTC

hive git commit: HIVE-13696: Monitor fair-scheduler.xml and automatically update/validate jobs submitted to fair-scheduler (Reuben Kuhnert, reviewed by Yongzhi Chen and Mohit Sabharwal)

Repository: hive
Updated Branches:
  refs/heads/master 58247c563 -> 323c835e5


HIVE-13696: Monitor fair-scheduler.xml and automatically update/validate jobs submitted to fair-scheduler (Reuben Kuhnert, reviewed by Yongzhi Chen and Mohit Sabharwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/323c835e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/323c835e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/323c835e

Branch: refs/heads/master
Commit: 323c835e52ded7fe7a195e20d722a08edd247b83
Parents: 58247c5
Author: Sergio Pena <se...@cloudera.com>
Authored: Wed Jan 11 17:11:34 2017 -0600
Committer: Sergio Pena <se...@cloudera.com>
Committed: Wed Jan 11 17:11:34 2017 -0600

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  11 ++
 .../hive/ql/session/YarnFairScheduling.java     |  70 +++++++++++
 .../service/cli/session/HiveSessionImpl.java    |   8 +-
 .../apache/hadoop/hive/shims/SchedulerShim.java |   2 +
 shims/scheduler/pom.xml                         |   7 +-
 .../schshim/FairSchedulerQueueAllocator.java    |  93 ++++++++++++++
 .../hadoop/hive/schshim/FairSchedulerShim.java  |  96 ++++++++++-----
 .../hadoop/hive/schshim/QueueAllocator.java     |  37 ++++++
 .../TestFairSchedulerQueueAllocator.java        | 120 +++++++++++++++++++
 9 files changed, 409 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index fd6020b..a367f27 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.session.YarnFairScheduling;
 import org.apache.hadoop.hive.serde2.ByteStream;
 import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
 import org.apache.hadoop.hive.shims.Utils;
@@ -539,6 +540,8 @@ public class Driver implements CommandProcessor {
         plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());
       }
 
+      configureScheduling(conf, userName);
+
       //do the authorization check
       if (!sem.skipAuthorization() &&
           HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {
@@ -663,6 +666,14 @@ public class Driver implements CommandProcessor {
     }
   }
 
+  private HiveConf configureScheduling(HiveConf configuration, String forUser) throws IOException, HiveException {
+    if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(configuration)) {
+        YarnFairScheduling.validateYarnQueue(configuration, forUser);
+    }
+
+    return configuration;
+  }
+
   private ImmutableMap<String, Long> dumpMetaCallTimingWithoutEx(String phase) {
     try {
       return Hive.get().dumpAndClearMetaCallTiming(phase);

http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java
new file mode 100644
index 0000000..e3ba47c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/YarnFairScheduling.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
+import java.io.IOException;
+
+/**
+ * A front handle for managing job submission to Yarn-FairScheduler.
+ */
+public class YarnFairScheduling {
+  /**
+   * Determine if jobs can be configured for YARN fair scheduling.
+   * @param conf - the current HiveConf configuration.
+   * @return Returns true when impersonation mode is disabled and fair-scheduling is enabled.
+   */
+  public static boolean usingNonImpersonationModeWithFairScheduling(HiveConf conf) {
+    return (conf != null)
+      && (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)
+      && (conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)));
+  }
+
+  /**
+   * Configure the default YARN queue for the user.
+   * @param conf - The current HiveConf configuration.
+   * @param forUser - The user to configure scheduling for.
+   * @throws IOException
+   * @throws HiveException
+   */
+  public static void setDefaultJobQueue(HiveConf conf, String forUser) throws IOException, HiveException {
+    Preconditions.checkState(usingNonImpersonationModeWithFairScheduling(conf),
+      "Unable to map job to fair-scheduler because either impersonation is on or fair-scheduling is disabled.");
+
+    ShimLoader.getSchedulerShims().refreshDefaultQueue(conf, forUser);
+  }
+
+  /**
+   * Validate the current YARN queue for the current user.
+   * @param conf - The current HiveConf configuration.
+   * @param forUser - The user to configure scheduling for.
+   * @throws IOException
+   * @throws HiveException
+   */
+  public static void validateYarnQueue(HiveConf conf, String forUser) throws IOException, HiveException {
+    Preconditions.checkState(usingNonImpersonationModeWithFairScheduling(conf),
+      "Unable to map job to fair-scheduler because either impersonation is on or fair-scheduling is disabled.");
+
+    ShimLoader.getSchedulerShims().validateQueueConfiguration(conf, forUser);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index f939a93..460a26b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.processors.SetProcessor;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.YarnFairScheduling;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -130,11 +131,10 @@ public class HiveSessionImpl implements HiveSession {
     try {
       // In non-impersonation mode, map scheduler queue to current user
       // if fair scheduler is configured.
-      if (! sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
-        sessionConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) {
-        ShimLoader.getHadoopShims().refreshDefaultQueue(sessionConf, username);
+      if (YarnFairScheduling.usingNonImpersonationModeWithFairScheduling(sessionConf)) {
+        YarnFairScheduling.setDefaultJobQueue(sessionConf, username);
       }
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.warn("Error setting scheduler queue: " + e, e);
     }
     // Set an explicit session name to control the download directory name

http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
index 63803b8..f88e192 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/SchedulerShim.java
@@ -34,4 +34,6 @@ public interface SchedulerShim {
    */
   public void refreshDefaultQueue(Configuration conf, String userName)
       throws IOException;
+
+  public void validateQueueConfiguration(Configuration configuration, String forUser) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/shims/scheduler/pom.xml b/shims/scheduler/pom.xml
index 9141c1e..4242973 100644
--- a/shims/scheduler/pom.xml
+++ b/shims/scheduler/pom.xml
@@ -77,7 +77,12 @@
      <version>${hadoop.version}</version>
      <optional>true</optional>
    </dependency>
-   <dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito-all.version}</version>
+    </dependency>
+    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-server-tests</artifactId>
      <version>${hadoop.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java
new file mode 100644
index 0000000..0e32ff0
--- /dev/null
+++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerQueueAllocator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.schshim;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FairSchedulerQueueAllocator implements QueueAllocator {
+  private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerQueueAllocator.class);
+  private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file";
+
+  private String currentlyWatching;
+  private AllocationFileLoaderService loaderService;
+  private final AtomicReference<AllocationConfiguration> allocationConfiguration
+    = new AtomicReference<AllocationConfiguration>();
+
+  /**
+   * Generates a Yarn FairScheduler queue resolver based on 'fair-scheduler.xml'.
+   * @param config The HiveConf configuration.
+   * @param username      The user to configure the job for.
+   * @return Returns a configured allocation resolver.
+   * @throws IOException
+   */
+  public synchronized AtomicReference<AllocationConfiguration> makeConfigurationFor(Configuration config, String username) throws IOException {
+    updateWatcher(config);
+
+    return allocationConfiguration;
+  }
+
+  public synchronized void refresh(Configuration config) {
+    updateWatcher(config);
+  }
+
+  @VisibleForTesting
+  public String getCurrentlyWatchingFile() {
+    return this.currentlyWatching;
+  }
+
+  private void updateWatcher(Configuration config) {
+    if (this.loaderService != null && StringUtils.equals(currentlyWatching, config.get(YARN_SCHEDULER_FILE_PROPERTY))) return;
+
+    this.currentlyWatching = config.get(YARN_SCHEDULER_FILE_PROPERTY);
+
+    if (this.loaderService != null) {
+      this.loaderService.stop();
+    }
+
+    this.loaderService = new AllocationFileLoaderService();
+    this.loaderService.init(config);
+    this.loaderService.setReloadListener(new AllocationFileLoaderService.Listener() {
+      @Override
+      public void onReload(AllocationConfiguration allocs) {
+        allocationConfiguration.set(allocs);
+      }
+    });
+
+    try {
+      this.loaderService.reloadAllocations();
+    } catch (Exception ex) {
+      LOG.error("Failed to load queue allocations", ex);
+    }
+
+    if (allocationConfiguration.get() == null) {
+      allocationConfiguration.set(new AllocationConfiguration(config));
+    }
+
+    this.loaderService.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
index 372244d..4856c10 100644
--- a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
+++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/FairSchedulerShim.java
@@ -17,54 +17,90 @@
  */
 package org.apache.hadoop.hive.schshim;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.shims.SchedulerShim;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
 
+import java.io.IOException;
+
+/*
+ * FairSchedulerShim monitors changes in fair-scheduler.xml (if it exists) to allow for dynamic
+ * reloading and queue resolution. When changes to the fair-scheduler.xml file are detected, the
+ * cached queue resolution policies for each user are cleared, and then re-cached/validated on job-submit.
+ */
+
 public class FairSchedulerShim implements SchedulerShim {
   private static final Logger LOG = LoggerFactory.getLogger(FairSchedulerShim.class);
   private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
 
+  private final QueueAllocator queueAllocator;
+
+  @VisibleForTesting
+  public FairSchedulerShim(QueueAllocator queueAllocator) {
+    this.queueAllocator = queueAllocator;
+  }
+
+  public FairSchedulerShim() {
+    this(new FairSchedulerQueueAllocator());
+  }
+
+  /**
+   * Applies the default YARN fair scheduler queue for a user.
+   * @param conf - the current HiveConf configuration.
+   * @param forUser - the user to configure the default queue for.
+   * @throws IOException
+   */
   @Override
-  public void refreshDefaultQueue(Configuration conf, String userName)
-      throws IOException {
-    String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME;
-    final AtomicReference<AllocationConfiguration> allocConf = new AtomicReference<AllocationConfiguration>();
+  public synchronized void refreshDefaultQueue(Configuration conf, String forUser)
+    throws IOException {
+    setJobQueueForUserInternal(conf, YarnConfiguration.DEFAULT_QUEUE_NAME, forUser);
+  }
 
-    AllocationFileLoaderService allocsLoader = new AllocationFileLoaderService();
-    allocsLoader.init(conf);
-    allocsLoader.setReloadListener(new AllocationFileLoaderService.Listener() {
-      @Override
-      public void onReload(AllocationConfiguration allocs) {
-        allocConf.set(allocs);
-      }
-    });
-    try {
-      allocsLoader.reloadAllocations();
-    } catch (Exception ex) {
-      throw new IOException("Failed to load queue allocations", ex);
-    }
-    if (allocConf.get() == null) {
-      allocConf.set(new AllocationConfiguration(conf));
+  /**
+   * Validates the YARN fair scheduler queue configuration.
+   * @param conf - the current HiveConf configuration.
+   * @param forUser - the user to configure the default queue for.
+   * @throws IOException
+   */
+  @Override
+  public synchronized void validateQueueConfiguration(Configuration conf, String forUser) throws IOException {
+    // Currently, "validation" is just to ensure that the client can still set the same queue that they
+    // could previously. In almost all situations, this should be essentially a no-op (unless the fair-scheduler.xml
+    // file changes in such a way as this is disallowed). Currently this implementation is just inteded to allow us
+    // to validate that the user's configuration is at least reasonable on a per-request basis beyond from the already-
+    // occurring per session setup.
+
+    // TODO: Build out ACL enforcement.
+
+    String currentJobQueue = conf.get(MR2_JOB_QUEUE_PROPERTY);
+    if (currentJobQueue != null && !currentJobQueue.isEmpty()) {
+      setJobQueueForUserInternal(conf, currentJobQueue, forUser);
+    } else {
+      refreshDefaultQueue(conf, forUser);
     }
-    QueuePlacementPolicy queuePolicy = allocConf.get().getPlacementPolicy();
+  }
+
+  public QueueAllocator getQueueAllocator() {
+    return this.queueAllocator;
+  }
+
+  private void setJobQueueForUserInternal(Configuration conf, String queueName, String forUser) throws IOException {
+    QueuePlacementPolicy queuePolicy = queueAllocator.makeConfigurationFor(conf, forUser).get().getPlacementPolicy();
+
     if (queuePolicy != null) {
-      requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName);
+      String requestedQueue = queuePolicy.assignAppToQueue(queueName, forUser);
       if (StringUtils.isNotBlank(requestedQueue)) {
-        LOG.debug("Setting queue name to " + requestedQueue + " for user "
-            + userName);
+        LOG.info("Setting queue name to: '{}' for user '{}'", requestedQueue, forUser);
         conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue);
+      } else {
+        LOG.warn("Unable to set queue: {} for user: {}, resetting to user's default queue.", requestedQueue, forUser);
+        conf.set(MR2_JOB_QUEUE_PROPERTY, queuePolicy.assignAppToQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, forUser));
       }
     }
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java
new file mode 100644
index 0000000..daf02da
--- /dev/null
+++ b/shims/scheduler/src/main/java/org/apache/hadoop/hive/schshim/QueueAllocator.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.schshim;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public interface QueueAllocator {
+  /**
+   * Generates a queue resolver for a given configuration and username.
+   * @param configuration The HiveConf configuration.
+   * @param username The user to configure the job for.
+   * @return Returns the queue allocation configuration.
+   * @throws IOException
+   */
+  AtomicReference<AllocationConfiguration> makeConfigurationFor(Configuration configuration, String username) throws IOException;
+  void refresh(Configuration configuration);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/323c835e/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java
----------------------------------------------------------------------
diff --git a/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java b/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java
new file mode 100644
index 0000000..58465e4
--- /dev/null
+++ b/shims/scheduler/src/test/java/org/apache/hadoop/hive/schshim/TestFairSchedulerQueueAllocator.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.schshim;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestFairSchedulerQueueAllocator {
+  private static final String EMPTY = "";
+  private static final int USERNAME_ARGUMENT_INDEX = 1;
+  private static final String YARN_SCHEDULER_FILE_PROPERTY = "yarn.scheduler.fair.allocation.file";
+  private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
+
+  @Test
+  public void testChangingLastUsedHiveConfigurationStringDirectly() throws Exception {
+    Configuration configuration = new Configuration();
+    FairSchedulerShim shim = new FairSchedulerShim();
+    FairSchedulerQueueAllocator allocator = (FairSchedulerQueueAllocator) shim.getQueueAllocator();
+
+    // On initialization should be uncached.
+    assertNull(allocator.getCurrentlyWatchingFile());
+
+    // Per job submission the location of fair-scheduler should be updated.
+    for (String location : new String[] { "/first", "/second", "third/fourth" }){
+      for (String user : new String[] { "firstUser", "secondUser", "thirdUser" }) {
+        configuration.set(YARN_SCHEDULER_FILE_PROPERTY, location);
+        shim.refreshDefaultQueue(configuration, user);
+        assertEquals(allocator.getCurrentlyWatchingFile(), location);
+      }
+    }
+  }
+
+  @Test
+  public void testNeverBeforeSeenUsersEffectOnLastUsedHiveConfigurationString() throws Exception {
+    final Configuration configuration = new Configuration();
+    FairSchedulerShim shim = new FairSchedulerShim();
+    FairSchedulerQueueAllocator allocator = (FairSchedulerQueueAllocator) shim.getQueueAllocator();
+
+    // Per job submission the location of fair-scheduler should be updated.
+    configuration.set(YARN_SCHEDULER_FILE_PROPERTY, "/some/unchanging/location");
+    for (String user : new String[] { "first", "second", "third", "fourth", "fifth" }) {
+      shim.refreshDefaultQueue(configuration, user);
+      assertEquals(allocator.getCurrentlyWatchingFile(), "/some/unchanging/location");
+    }
+  }
+
+  @Test
+  public void testQueueAllocation() throws Exception {
+    Configuration configuration = new Configuration();
+    QueueAllocator allocator = mock(QueueAllocator.class);
+
+    when(allocator.makeConfigurationFor(any(Configuration.class), any(String.class)))
+      .thenAnswer(new Answer<AtomicReference<AllocationConfiguration>>() {
+        @Override
+        public AtomicReference<AllocationConfiguration> answer(InvocationOnMock invocationOnMock) throws Throwable {
+          // Capture which user is causing the reset for verification purposes.
+          final String username = (String) invocationOnMock.getArguments()[USERNAME_ARGUMENT_INDEX];
+
+          AllocationConfiguration allocationConfiguration = mock(AllocationConfiguration.class);
+          when(allocationConfiguration.getPlacementPolicy())
+            .thenAnswer(new Answer<QueuePlacementPolicy>() {
+              @Override
+              public QueuePlacementPolicy answer(InvocationOnMock invocationOnMock) throws Throwable {
+                QueuePlacementPolicy placementPolicy = mock(QueuePlacementPolicy.class);
+                when(placementPolicy.assignAppToQueue(any(String.class), any(String.class)))
+                  .thenAnswer(new Answer<String>() {
+                    @Override
+                    public String answer(InvocationOnMock invocationOnMock) throws Throwable {
+                      return String.format("queue.for.%s", username);
+                    }
+                  });
+
+                return placementPolicy;
+              }
+            });
+
+          return new AtomicReference<>(allocationConfiguration);
+        }
+      });
+
+    FairSchedulerShim shim = new FairSchedulerShim(allocator);
+
+    // Per job submission the location of fair-scheduler should be updated.
+    configuration.set(YARN_SCHEDULER_FILE_PROPERTY, "/some/file/location");
+    for (String user : new String[] { "first", "second", "third", "fourth", "fifth" }) {
+      shim.refreshDefaultQueue(configuration, user);
+
+      String queueName = String.format("queue.for.%s", user);
+      assertEquals(configuration.get(MR2_JOB_QUEUE_PROPERTY), queueName);
+    }
+  }
+}
\ No newline at end of file