You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/09/19 21:35:25 UTC

incubator-impala git commit: IMPALA-5920: Remove admission control dependency on YARN RM jar

Repository: incubator-impala
Updated Branches:
  refs/heads/master 1e63ff843 -> 8ad6d0331


IMPALA-5920: Remove admission control dependency on YARN RM jar

Impala's admission controller relies on the YARN
fair-scheduler.xml for configuration. That configuration is
loaded using YARN directly (ie. as a library by the
frontend). In Hadoop 3, a number of changes were made to the
YARN resourcemanager which break Impala. While we eventually
want to rethink the admission control configuration
(IMPALA-4159), in the meantime we at least should avoid
using unsupported YARN APIs.

This patch removes the fe dependency on the YARN artifact
'hadoop-yarn-server-resourcemanager' which contains private
APIs and isn't meant to be used as a library.

A subset of the required code has been added in
'common/yarn-extras', taken from Hadoop 2.6.0 in CDH, e.g.
see [1]. The code is added in packages 'org.apache.impala.*'
instead of 'org.apache.yarn.*'. Some code could be copied
as-is, those files are marked with the comment:
//YARNUTIL: VANILLA
Files that required some modifications are marked with:
//YARNUTIL: MODIFIED
Or, if all code except a dummy interface could be added:
//YARNUTIL: DUMMY IMPL

The goal the 'yarn-extras' is to make Impala's handling of
the AC configuration self-sufficient, i.e. it shouldn't
matter what version of Hadoop exists. As-is, this was tested
and found to work when Hadoop 2.6 is installed. Because
the yarn-extras/pom.xml still references hadoop-common,
hadoop-yarn-common, and hadoop-yarn-api, additional testing
will be required to ensure Impala using yarn-extras works
when installed along side Hadoop 3.

That testing for Hadoop 3 will be done later. Future changes
will make any other changes required for existing code to
work when Hadoop 3 is installed.

Testing:
* Ran existing tests on master.

1: https://www.cloudera.com/documentation/enterprise/release-notes/topics/cm_vd_cdh_package_tarball_512.html

Change-Id: I7efdd8ebea298836ca2a82c0a4ae037ac9285bcf
Reviewed-on: http://gerrit.cloudera.org:8080/8035
Reviewed-by: Matthew Jacobs <mj...@apache.org>
Tested-by: Matthew Jacobs <mj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8ad6d033
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8ad6d033
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8ad6d033

Branch: refs/heads/master
Commit: 8ad6d03310825418b6aa1d427200b07bc8bdb0bc
Parents: 1e63ff8
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Mon Sep 11 10:53:07 2017 -0700
Committer: Matthew Jacobs <mj...@apache.org>
Committed: Tue Sep 19 21:32:07 2017 +0000

----------------------------------------------------------------------
 CMakeLists.txt                                  |   1 +
 common/yarn-extras/CMakeLists.txt               |  20 +
 common/yarn-extras/README.txt                   |   5 +
 common/yarn-extras/pom.xml                      | 118 ++++
 .../resource/ResourceWeights.java               |  26 +
 .../scheduler/fair/AllocationConfiguration.java | 184 +++++++
 .../fair/AllocationConfigurationException.java  |  40 ++
 .../fair/AllocationFileLoaderService.java       | 536 +++++++++++++++++++
 .../scheduler/fair/FSQueueType.java             |  32 ++
 .../fair/FairSchedulerConfiguration.java        |  77 +++
 .../scheduler/fair/QueuePlacementPolicy.java    | 181 +++++++
 .../scheduler/fair/QueuePlacementRule.java      | 367 +++++++++++++
 .../scheduler/fair/SchedulingPolicy.java        |  28 +
 .../impala/yarn/server/utils/BuilderUtils.java  |  41 ++
 fe/CMakeLists.txt                               |   3 +-
 fe/pom.xml                                      |  19 +-
 .../apache/impala/util/RequestPoolService.java  |   6 +-
 .../impala/util/TestRequestPoolService.java     |  23 +-
 18 files changed, 1690 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e8a2355..d60487f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -346,6 +346,7 @@ include_directories(SYSTEM ${KUDU_CLIENT_INCLUDE_DIR})
 add_subdirectory(common/function-registry)
 add_subdirectory(common/thrift)
 add_subdirectory(common/fbs)
+add_subdirectory(common/yarn-extras)
 add_subdirectory(be)
 add_subdirectory(fe)
 add_subdirectory(ext-data-source)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/yarn-extras/CMakeLists.txt b/common/yarn-extras/CMakeLists.txt
new file mode 100644
index 0000000..81144c3
--- /dev/null
+++ b/common/yarn-extras/CMakeLists.txt
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_custom_target(yarn-extras ALL
+  COMMAND $ENV{IMPALA_HOME}/bin/mvn-quiet.sh install -DskipTests
+)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/README.txt
----------------------------------------------------------------------
diff --git a/common/yarn-extras/README.txt b/common/yarn-extras/README.txt
new file mode 100644
index 0000000..543c596
--- /dev/null
+++ b/common/yarn-extras/README.txt
@@ -0,0 +1,5 @@
+Extra Hadoop classes from Yarn needed by Impala.
+
+This is necessary because Impala has an admission controller that is configured using the
+same configuration as Yarn (i.e. a fair-scheduler.xml). Some Yarn classes are used to
+provide user to pool resolution, authorization, and accessing pool configurations.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/pom.xml
----------------------------------------------------------------------
diff --git a/common/yarn-extras/pom.xml b/common/yarn-extras/pom.xml
new file mode 100644
index 0000000..8f3ba4d
--- /dev/null
+++ b/common/yarn-extras/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.impala</groupId>
+  <artifactId>yarn-extras</artifactId>
+  <version>0.1-SNAPSHOT</version>
+  <name>YARN Extras</name>
+  <description>Extra Hadoop classes from YARN needed by Impala</description>
+  <packaging>jar</packaging>
+  <url>.</url>
+
+  <properties>
+    <hadoop.version>${env.IMPALA_HADOOP_VERSION}</hadoop.version>
+  </properties>
+
+  <repositories>
+    <repository>
+       <id>apache.snapshots</id>
+       <name>Apache Development Snapshot Repository</name>
+        <url>https://repository.apache.org/content/repositories/snapshots/</url>
+        <releases>
+            <enabled>false</enabled>
+        </releases>
+        <snapshots>
+            <enabled>true</enabled>
+        </snapshots>
+    </repository>
+
+    <repository>
+      <id>cdh.rcs.releases.repo</id>
+      <url>https://repository.cloudera.com/content/groups/cdh-releases-rcs</url>
+      <name>CDH Releases Repository</name>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+
+    <repository>
+      <id>cdh.snapshots.repo</id>
+      <url>https://repository.cloudera.com/content/repositories/snapshots</url>
+      <name>CDH Snapshots Repository</name>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+
+    <repository>
+      <id>cdh.repo</id>
+      <url>https://repository.cloudera.com/content/groups/cloudera-repos</url>
+      <name>Cloudera Repositories</name>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+
+    <repository>
+      <id>Codehaus repository</id>
+      <url>http://repository.codehaus.org/</url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.3</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java
new file mode 100644
index 0000000..b4a8d67
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.impala.yarn.server.resourcemanager.resource;
+//YARNUTIL: DUMMY IMPL
+
+public class ResourceWeights {
+
+  public ResourceWeights(float f) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
new file mode 100644
index 0000000..f304c6d
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: MODIFIED
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.impala.yarn.server.resourcemanager.resource.ResourceWeights;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AllocationConfiguration {
+  private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
+  private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
+  private static final ResourceCalculator RESOURCE_CALCULATOR =
+      new DefaultResourceCalculator();
+  // Minimum resource allocation for each queue
+  private final Map<String, Resource> minQueueResources;
+  // Maximum amount of resources per queue
+  @VisibleForTesting
+  final Map<String, Resource> maxQueueResources;
+
+  private final Resource queueMaxResourcesDefault;
+
+  // ACL's for each queue. Only specifies non-default ACL's from configuration.
+  private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
+
+  // Policy for mapping apps to queues
+  @VisibleForTesting
+  QueuePlacementPolicy placementPolicy;
+  
+  //Configured queues in the alloc xml
+  @VisibleForTesting
+  Map<FSQueueType, Set<String>> configuredQueues;
+
+  public AllocationConfiguration(Map<String, Resource> minQueueResources,
+      Map<String, Resource> maxQueueResources,
+      Map<String, Resource> maxChildQueueResources,
+      Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
+      Map<String, ResourceWeights> queueWeights,
+      Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
+      int queueMaxAppsDefault, Resource queueMaxResourcesDefault,
+      float queueMaxAMShareDefault,
+      Map<String, SchedulingPolicy> schedulingPolicies,
+      SchedulingPolicy defaultSchedulingPolicy,
+      Map<String, Long> minSharePreemptionTimeouts,
+      Map<String, Long> fairSharePreemptionTimeouts,
+      Map<String, Float> fairSharePreemptionThresholds,
+      Map<String, Map<QueueACL, AccessControlList>> queueAcls,
+      QueuePlacementPolicy placementPolicy,
+      Map<FSQueueType, Set<String>> configuredQueues,
+      Set<String> nonPreemptableQueues) {
+    this.minQueueResources = minQueueResources;
+    this.maxQueueResources = maxQueueResources;
+    this.queueMaxResourcesDefault = queueMaxResourcesDefault;
+    this.queueAcls = queueAcls;
+    this.placementPolicy = placementPolicy;
+    this.configuredQueues = configuredQueues;
+  }
+  
+  public AllocationConfiguration(Configuration conf) {
+    minQueueResources = new HashMap<>();
+    maxQueueResources = new HashMap<>();
+    queueMaxResourcesDefault = Resources.unbounded();
+    queueAcls = new HashMap<>();
+    configuredQueues = new HashMap<>();
+    for (FSQueueType queueType : FSQueueType.values()) {
+      configuredQueues.put(queueType, new HashSet<String>());
+    }
+    placementPolicy =
+        QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
+  }
+  
+  /**
+   * Get the ACLs associated with this queue. If a given ACL is not explicitly
+   * configured, include the default value for that ACL.  The default for the
+   * root queue is everybody ("*") and the default for all other queues is
+   * nobody ("")
+   */
+  public AccessControlList getQueueAcl(String queue, QueueACL operation) {
+    Map<QueueACL, AccessControlList> queueAcls = this.queueAcls.get(queue);
+    if (queueAcls != null) {
+      AccessControlList operationAcl = queueAcls.get(operation);
+      if (operationAcl != null) {
+        return operationAcl;
+      }
+    }
+    return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL;
+  }
+
+  /**
+   * Get the minimum resource allocation for the given queue.
+   *
+   * @param queue the target queue's name
+   * @return the min allocation on this queue or {@link Resources#none}
+   * if not set
+   */
+  public Resource getMinResources(String queue) {
+    Resource minQueueResource = minQueueResources.get(queue);
+    return (minQueueResource == null) ? Resources.none() : minQueueResource;
+  }
+
+  /**
+   * Set the maximum resource allocation for the given queue.
+   *
+   * @param queue the target queue
+   * @param maxResource the maximum resource allocation
+   */
+  void setMaxResources(String queue, Resource maxResource) {
+    maxQueueResources.put(queue, maxResource);
+  }
+
+  /**
+   * Get the maximum resource allocation for the given queue. If the max in not
+   * set, return the larger of the min and the default max.
+   *
+   * @param queue the target queue's name
+   * @return the max allocation on this queue
+   */
+  public Resource getMaxResources(String queue) {
+    Resource maxQueueResource = maxQueueResources.get(queue);
+    if (maxQueueResource == null) {
+      Resource minQueueResource = minQueueResources.get(queue);
+      if (minQueueResource != null &&
+          Resources.greaterThan(RESOURCE_CALCULATOR, Resources.unbounded(),
+          minQueueResource, queueMaxResourcesDefault)) {
+        return minQueueResource;
+      } else {
+        return queueMaxResourcesDefault;
+      }
+    } else {
+      return maxQueueResource;
+    }
+  }
+
+  public boolean hasAccess(String queueName, QueueACL acl,
+      UserGroupInformation user) {
+    int lastPeriodIndex = queueName.length();
+    while (lastPeriodIndex != -1) {
+      String queue = queueName.substring(0, lastPeriodIndex);
+      if (getQueueAcl(queue, acl).isUserAllowed(user)) {
+        return true;
+      }
+
+      lastPeriodIndex = queueName.lastIndexOf('.', lastPeriodIndex - 1);
+    }
+    
+    return false;
+  }
+
+  public Map<FSQueueType, Set<String>> getConfiguredQueues() {
+    return configuredQueues;
+  }
+  
+  public QueuePlacementPolicy getPlacementPolicy() {
+    return placementPolicy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
new file mode 100644
index 0000000..1f848fb
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: VANILLA
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Thrown when the allocation file for {@link QueueManager} is malformed.
+ */
+@Private
+@Unstable
+public class AllocationConfigurationException extends Exception {
+  private static final long serialVersionUID = 4046517047810854249L;
+
+  public AllocationConfigurationException(String message) {
+    super(message);
+  }
+
+  public AllocationConfigurationException(String message, Throwable t) {
+    super(message, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
new file mode 100644
index 0000000..ed9943a
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -0,0 +1,536 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: VANILLA
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.impala.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.annotations.VisibleForTesting;
+
+@Public
+@Unstable
+public class AllocationFileLoaderService extends AbstractService {
+  
+  public static final Log LOG = LogFactory.getLog(
+      AllocationFileLoaderService.class.getName());
+  
+  /** Time to wait between checks of the allocation file */
+  public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000;
+
+  /**
+   * Time to wait after the allocation has been modified before reloading it
+   * (this is done to prevent loading a file that hasn't been fully written).
+   */
+  public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
+
+  public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
+  private final Clock clock;
+
+  private long lastSuccessfulReload; // Last time we successfully reloaded queues
+  private boolean lastReloadAttemptFailed = false;
+  
+  // Path to XML file containing allocations. 
+  private File allocFile;
+  
+  private Listener reloadListener;
+  
+  @VisibleForTesting
+  long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
+  
+  private Thread reloadThread;
+  private volatile boolean running = true;
+  
+  public AllocationFileLoaderService() {
+    this(new SystemClock());
+  }
+  
+  public AllocationFileLoaderService(Clock clock) {
+    super(AllocationFileLoaderService.class.getName());
+    this.clock = clock;
+    
+  }
+  
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    this.allocFile = getAllocationFile(conf);
+    if (allocFile != null) {
+      reloadThread = new Thread() {
+        @Override
+        public void run() {
+          while (running) {
+            long time = clock.getTime();
+            long lastModified = allocFile.lastModified();
+            if (lastModified > lastSuccessfulReload &&
+                time > lastModified + ALLOC_RELOAD_WAIT_MS) {
+              try {
+                reloadAllocations();
+              } catch (Exception ex) {
+                if (!lastReloadAttemptFailed) {
+                  LOG.error("Failed to reload fair scheduler config file - " +
+                      "will use existing allocations.", ex);
+                }
+                lastReloadAttemptFailed = true;
+              }
+            } else if (lastModified == 0l) {
+              if (!lastReloadAttemptFailed) {
+                LOG.warn("Failed to reload fair scheduler config file because" +
+                    " last modified returned 0. File exists: "
+                    + allocFile.exists());
+              }
+              lastReloadAttemptFailed = true;
+            }
+            try {
+              Thread.sleep(reloadIntervalMs);
+            } catch (InterruptedException ex) {
+              LOG.info(
+                  "Interrupted while waiting to reload alloc configuration");
+            }
+          }
+        }
+      };
+      reloadThread.setName("AllocationFileReloader");
+      reloadThread.setDaemon(true);
+    }
+    super.serviceInit(conf);
+  }
+  
+  @Override
+  public void serviceStart() throws Exception {
+    if (reloadThread != null) {
+      reloadThread.start();
+    }
+    super.serviceStart();
+  }
+  
+  @Override
+  public void serviceStop() throws Exception {
+    running = false;
+    if (reloadThread != null) {
+      reloadThread.interrupt();
+      try {
+        reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
+      } catch (InterruptedException e) {
+        LOG.warn("reloadThread fails to join.");
+      }
+    }
+    super.serviceStop();
+  }
+  
+  /**
+   * Path to XML file containing allocations. If the
+   * path is relative, it is searched for in the
+   * classpath, but loaded like a regular File.
+   */
+  public File getAllocationFile(Configuration conf) {
+    String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE,
+        FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE);
+    File allocFile = new File(allocFilePath);
+    if (!allocFile.isAbsolute()) {
+      URL url = Thread.currentThread().getContextClassLoader()
+          .getResource(allocFilePath);
+      if (url == null) {
+        LOG.warn(allocFilePath + " not found on the classpath.");
+        allocFile = null;
+      } else if (!url.getProtocol().equalsIgnoreCase("file")) {
+        throw new RuntimeException("Allocation file " + url
+            + " found on the classpath is not on the local filesystem.");
+      } else {
+        allocFile = new File(url.getPath());
+      }
+    }
+    return allocFile;
+  }
+  
+  public synchronized void setReloadListener(Listener reloadListener) {
+    this.reloadListener = reloadListener;
+  }
+  
+  /**
+   * Updates the allocation list from the allocation config file. This file is
+   * expected to be in the XML format specified in the design doc.
+   *
+   * @throws IOException if the config file cannot be read.
+   * @throws AllocationConfigurationException if allocations are invalid.
+   * @throws ParserConfigurationException if XML parser is misconfigured.
+   * @throws SAXException if config file is malformed.
+   */
+  public synchronized void reloadAllocations() throws IOException,
+      ParserConfigurationException, SAXException,
+      AllocationConfigurationException {
+    if (allocFile == null) {
+      return;
+    }
+    LOG.info("Loading allocation file " + allocFile);
+    // Create some temporary hashmaps to hold the new allocs, and we only save
+    // them in our fields if we have parsed the entire allocs file successfully.
+    Map<String, Resource> minQueueResources = new HashMap<>();
+    Map<String, Resource> maxQueueResources = new HashMap<>();
+    Map<String, Resource> maxChildQueueResources = new HashMap<>();
+    Map<String, Integer> queueMaxApps = new HashMap<>();
+    Map<String, Integer> userMaxApps = new HashMap<>();
+    Map<String, Float> queueMaxAMShares = new HashMap<>();
+    Map<String, ResourceWeights> queueWeights = new HashMap<>();
+    Map<String, SchedulingPolicy> queuePolicies = new HashMap<>();
+    Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
+    Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
+    Map<String, Float> fairSharePreemptionThresholds = new HashMap<>();
+    Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
+    Set<String> nonPreemptableQueues = new HashSet<>();
+    int userMaxAppsDefault = Integer.MAX_VALUE;
+    int queueMaxAppsDefault = Integer.MAX_VALUE;
+    Resource queueMaxResourcesDefault = Resources.unbounded();
+    float queueMaxAMShareDefault = 0.5f;
+    long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
+    long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+    float defaultFairSharePreemptionThreshold = 0.5f;
+    SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
+
+    QueuePlacementPolicy newPlacementPolicy = null;
+
+    // Remember all queue names so we can display them on web UI, etc.
+    // configuredQueues is segregated based on whether it is a leaf queue
+    // or a parent queue. This information is used for creating queues
+    // and also for making queue placement decisions(QueuePlacementRule.java).
+    Map<FSQueueType, Set<String>> configuredQueues = new HashMap<>();
+
+    for (FSQueueType queueType : FSQueueType.values()) {
+      configuredQueues.put(queueType, new HashSet<String>());
+    }
+
+    // Read and parse the allocations file.
+    DocumentBuilderFactory docBuilderFactory =
+      DocumentBuilderFactory.newInstance();
+    docBuilderFactory.setIgnoringComments(true);
+    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+    Document doc = builder.parse(allocFile);
+    Element root = doc.getDocumentElement();
+    if (!"allocations".equals(root.getTagName()))
+      throw new AllocationConfigurationException("Bad fair scheduler config " +
+          "file: top-level element not <allocations>");
+    NodeList elements = root.getChildNodes();
+    List<Element> queueElements = new ArrayList<Element>();
+    Element placementPolicyElement = null;
+    for (int i = 0; i < elements.getLength(); i++) {
+      Node node = elements.item(i);
+      if (node instanceof Element) {
+        Element element = (Element)node;
+        if ("queue".equals(element.getTagName()) ||
+          "pool".equals(element.getTagName())) {
+          queueElements.add(element);
+        } else if ("user".equals(element.getTagName())) {
+          String userName = element.getAttribute("name");
+          NodeList fields = element.getChildNodes();
+          for (int j = 0; j < fields.getLength(); j++) {
+            Node fieldNode = fields.item(j);
+            if (!(fieldNode instanceof Element))
+              continue;
+            Element field = (Element) fieldNode;
+            if ("maxRunningApps".equals(field.getTagName())) {
+              String text = ((Text)field.getFirstChild()).getData().trim();
+              int val = Integer.parseInt(text);
+              userMaxApps.put(userName, val);
+            }
+          }
+        } else if ("queueMaxResourcesDefault".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          Resource val =
+              FairSchedulerConfiguration.parseResourceConfigValue(text);
+          queueMaxResourcesDefault = val;
+        } else if ("userMaxAppsDefault".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          int val = Integer.parseInt(text);
+          userMaxAppsDefault = val;
+        } else if ("defaultFairSharePreemptionTimeout"
+            .equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          long val = Long.parseLong(text) * 1000L;
+          defaultFairSharePreemptionTimeout = val;
+        } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
+          if (defaultFairSharePreemptionTimeout == Long.MAX_VALUE) {
+            String text = ((Text)element.getFirstChild()).getData().trim();
+            long val = Long.parseLong(text) * 1000L;
+            defaultFairSharePreemptionTimeout = val;
+          }
+        } else if ("defaultMinSharePreemptionTimeout"
+            .equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          long val = Long.parseLong(text) * 1000L;
+          defaultMinSharePreemptionTimeout = val;
+        } else if ("defaultFairSharePreemptionThreshold"
+            .equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          float val = Float.parseFloat(text);
+          val = Math.max(Math.min(val, 1.0f), 0.0f);
+          defaultFairSharePreemptionThreshold = val;
+        } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          int val = Integer.parseInt(text);
+          queueMaxAppsDefault = val;
+        } else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          float val = Float.parseFloat(text);
+          val = Math.min(val, 1.0f);
+          queueMaxAMShareDefault = val;
+        } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
+            || "defaultQueueSchedulingMode".equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          if (text.equalsIgnoreCase("FIFO")) {
+            throw new AllocationConfigurationException("Bad fair scheduler "
+              + "config file: defaultQueueSchedulingPolicy or "
+              + "defaultQueueSchedulingMode can't be FIFO.");
+          }
+          defaultSchedPolicy = SchedulingPolicy.parse(text);
+        } else if ("queuePlacementPolicy".equals(element.getTagName())) {
+          placementPolicyElement = element;
+        } else {
+          LOG.warn("Bad element in allocations file: " + element.getTagName());
+        }
+      }
+    }
+
+    // Load queue elements.  A root queue can either be included or omitted.  If
+    // it's included, all other queues must be inside it.
+    for (Element element : queueElements) {
+      String parent = "root";
+      if (element.getAttribute("name").equalsIgnoreCase("root")) {
+        if (queueElements.size() > 1) {
+          throw new AllocationConfigurationException("If configuring root queue,"
+              + " no other queues can be placed alongside it.");
+        }
+        parent = null;
+      }
+      loadQueue(parent, element, minQueueResources, maxQueueResources,
+          maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
+          queueWeights, queuePolicies, minSharePreemptionTimeouts,
+          fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
+          queueAcls, configuredQueues, nonPreemptableQueues);
+    }
+
+    // Load placement policy and pass it configured queues
+    Configuration conf = getConfig();
+    if (placementPolicyElement != null) {
+      newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
+          configuredQueues, conf);
+    } else {
+      newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
+          configuredQueues);
+    }
+
+    // Set the min/fair share preemption timeout for the root queue
+    if (!minSharePreemptionTimeouts.containsKey("root")){
+      minSharePreemptionTimeouts.put("root",
+          defaultMinSharePreemptionTimeout);
+    }
+    if (!fairSharePreemptionTimeouts.containsKey("root")) {
+      fairSharePreemptionTimeouts.put("root",
+          defaultFairSharePreemptionTimeout);
+    }
+
+    // Set the fair share preemption threshold for the root queue
+    if (!fairSharePreemptionThresholds.containsKey("root")) {
+      fairSharePreemptionThresholds.put("root",
+          defaultFairSharePreemptionThreshold);
+    }
+
+    AllocationConfiguration info =
+        new AllocationConfiguration(minQueueResources, maxQueueResources,
+        maxChildQueueResources, queueMaxApps, userMaxApps, queueWeights,
+        queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
+        queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
+        defaultSchedPolicy, minSharePreemptionTimeouts,
+        fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
+        newPlacementPolicy, configuredQueues, nonPreemptableQueues);
+    lastSuccessfulReload = clock.getTime();
+    lastReloadAttemptFailed = false;
+
+    reloadListener.onReload(info);
+  }
+  
+  /**
+   * Loads a queue from a queue element in the configuration file
+   */
+  private void loadQueue(String parentName, Element element,
+      Map<String, Resource> minQueueResources,
+      Map<String, Resource> maxQueueResources,
+      Map<String, Resource> maxChildQueueResources,
+      Map<String, Integer> queueMaxApps,
+      Map<String, Integer> userMaxApps,
+      Map<String, Float> queueMaxAMShares,
+      Map<String, ResourceWeights> queueWeights,
+      Map<String, SchedulingPolicy> queuePolicies,
+      Map<String, Long> minSharePreemptionTimeouts,
+      Map<String, Long> fairSharePreemptionTimeouts,
+      Map<String, Float> fairSharePreemptionThresholds,
+      Map<String, Map<QueueACL, AccessControlList>> queueAcls,
+      Map<FSQueueType, Set<String>> configuredQueues,
+      Set<String> nonPreemptableQueues)
+      throws AllocationConfigurationException {
+    String queueName = CharMatcher.WHITESPACE.trimFrom(
+        element.getAttribute("name"));
+
+    if (queueName.contains(".")) {
+      throw new AllocationConfigurationException("Bad fair scheduler config "
+          + "file: queue name (" + queueName + ") shouldn't contain period.");
+    }
+
+    if (queueName.isEmpty()) {
+      throw new AllocationConfigurationException("Bad fair scheduler config "
+          + "file: queue name shouldn't be empty or "
+          + "consist only of whitespace.");
+    }
+
+    if (parentName != null) {
+      queueName = parentName + "." + queueName;
+    }
+
+    Map<QueueACL, AccessControlList> acls = new HashMap<>();
+    NodeList fields = element.getChildNodes();
+    boolean isLeaf = true;
+
+    for (int j = 0; j < fields.getLength(); j++) {
+      Node fieldNode = fields.item(j);
+      if (!(fieldNode instanceof Element))
+        continue;
+      Element field = (Element) fieldNode;
+      if ("minResources".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        Resource val =
+            FairSchedulerConfiguration.parseResourceConfigValue(text);
+        minQueueResources.put(queueName, val);
+      } else if ("maxResources".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        Resource val =
+            FairSchedulerConfiguration.parseResourceConfigValue(text);
+        maxQueueResources.put(queueName, val);
+      } else if ("maxChildResources".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        Resource val =
+            FairSchedulerConfiguration.parseResourceConfigValue(text);
+        maxChildQueueResources.put(queueName, val);
+      } else if ("maxRunningApps".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        queueMaxApps.put(queueName, val);
+      } else if ("maxAMShare".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        float val = Float.parseFloat(text);
+        val = Math.min(val, 1.0f);
+        queueMaxAMShares.put(queueName, val);
+      } else if ("weight".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        double val = Double.parseDouble(text);
+        queueWeights.put(queueName, new ResourceWeights((float)val));
+      } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        long val = Long.parseLong(text) * 1000L;
+        minSharePreemptionTimeouts.put(queueName, val);
+      } else if ("fairSharePreemptionTimeout".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        long val = Long.parseLong(text) * 1000L;
+        fairSharePreemptionTimeouts.put(queueName, val);
+      } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        float val = Float.parseFloat(text);
+        val = Math.max(Math.min(val, 1.0f), 0.0f);
+        fairSharePreemptionThresholds.put(queueName, val);
+      } else if ("schedulingPolicy".equals(field.getTagName())
+          || "schedulingMode".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        SchedulingPolicy policy = SchedulingPolicy.parse(text);
+        queuePolicies.put(queueName, policy);
+      } else if ("aclSubmitApps".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData();
+        acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
+      } else if ("aclAdministerApps".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData();
+        acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
+      } else if ("allowPreemptionFrom".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        if (!Boolean.parseBoolean(text)) {
+          nonPreemptableQueues.add(queueName);
+        }
+      } else if ("queue".endsWith(field.getTagName()) || 
+          "pool".equals(field.getTagName())) {
+        loadQueue(queueName, field, minQueueResources, maxQueueResources,
+            maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
+            queueWeights, queuePolicies, minSharePreemptionTimeouts,
+            fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
+            queueAcls, configuredQueues, nonPreemptableQueues);
+        configuredQueues.get(FSQueueType.PARENT).add(queueName);
+        isLeaf = false;
+      }
+    }
+    if (isLeaf) {
+      // if a leaf in the alloc file is marked as type='parent'
+      // then store it under 'parent'
+      if ("parent".equals(element.getAttribute("type"))) {
+        configuredQueues.get(FSQueueType.PARENT).add(queueName);
+      } else {
+        configuredQueues.get(FSQueueType.LEAF).add(queueName);
+      }
+    }
+    queueAcls.put(queueName, acls);
+    if (maxQueueResources.containsKey(queueName) &&
+        minQueueResources.containsKey(queueName)
+        && !Resources.fitsIn(minQueueResources.get(queueName),
+            maxQueueResources.get(queueName))) {
+      LOG.warn(
+          String.format("Queue %s has max resources %s less than "
+              + "min resources %s", queueName, maxQueueResources.get(queueName),
+              minQueueResources.get(queueName)));
+    }
+  }
+  
+  public interface Listener {
+    public void onReload(AllocationConfiguration info);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java
new file mode 100644
index 0000000..ba8912d
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: VANILLA
+
+public enum FSQueueType {
+  /*
+   * Represents a leaf queue 
+   */
+  LEAF, 
+  
+  /*
+   * Represents a parent queue
+   */
+  PARENT
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
new file mode 100644
index 0000000..572ef74
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: MODIFIED
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.impala.yarn.server.utils.BuilderUtils;
+
+public class FairSchedulerConfiguration extends Configuration {
+
+  private static final String CONF_PREFIX = "yarn.scheduler.fair.";
+
+  public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
+  protected static final String DEFAULT_ALLOCATION_FILE = "fair-scheduler.xml";
+
+  /** Whether pools can be created that were not specified in the FS configuration file
+   */
+  protected static final String ALLOW_UNDECLARED_POOLS = CONF_PREFIX + "allow-undeclared-pools";
+  protected static final boolean DEFAULT_ALLOW_UNDECLARED_POOLS = true;
+
+  /**
+   * Whether to use the user name as the queue name (instead of "default") if
+   * the request does not specify a queue.
+   */
+  protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
+  protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
+
+  /**
+   * Parses a resource config value of a form like "1024", "1024 mb",
+   * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.
+   *
+   * @throws AllocationConfigurationException
+   */
+  public static Resource parseResourceConfigValue(String val)
+      throws AllocationConfigurationException {
+    try {
+      val = val.toLowerCase();
+      int memory = findResource(val, "mb");
+      int vcores = findResource(val, "vcores");
+      return BuilderUtils.newResource(memory, vcores);
+    } catch (AllocationConfigurationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new AllocationConfigurationException(
+          "Error reading resource config", ex);
+    }
+  }
+
+  private static int findResource(String val, String units)
+    throws AllocationConfigurationException {
+    Pattern pattern = Pattern.compile("(\\d+)(\\.\\d*)?\\s*" + units);
+    Matcher matcher = pattern.matcher(val);
+    if (!matcher.find()) {
+      throw new AllocationConfigurationException("Missing resource: " + units);
+    }
+    return Integer.parseInt(matcher.group(1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
new file mode 100644
index 0000000..74c690c
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: VANILLA
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+@Private
+@Unstable
+public class QueuePlacementPolicy {
+  private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses;
+  static {
+    Map<String, Class<? extends QueuePlacementRule>> map =
+        new HashMap<String, Class<? extends QueuePlacementRule>>();
+    map.put("user", QueuePlacementRule.User.class);
+    map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
+    map.put("secondaryGroupExistingQueue",
+        QueuePlacementRule.SecondaryGroupExistingQueue.class);
+    map.put("specified", QueuePlacementRule.Specified.class);
+    map.put("nestedUserQueue",
+        QueuePlacementRule.NestedUserQueue.class);
+    map.put("default", QueuePlacementRule.Default.class);
+    map.put("reject", QueuePlacementRule.Reject.class);
+    ruleClasses = Collections.unmodifiableMap(map);
+  }
+  
+  private final List<QueuePlacementRule> rules;
+  private final Map<FSQueueType, Set<String>> configuredQueues;
+  private final Groups groups;
+  
+  public QueuePlacementPolicy(List<QueuePlacementRule> rules,
+      Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+      throws AllocationConfigurationException {
+    for (int i = 0; i < rules.size()-1; i++) {
+      if (rules.get(i).isTerminal()) {
+        throw new AllocationConfigurationException("Rules after rule "
+            + i + " in queue placement policy can never be reached");
+      }
+    }
+    if (!rules.get(rules.size()-1).isTerminal()) {
+      throw new AllocationConfigurationException(
+          "Could get past last queue placement rule without assigning");
+    }
+    this.rules = rules;
+    this.configuredQueues = configuredQueues;
+    groups = new Groups(conf);
+  }
+  
+  /**
+   * Builds a QueuePlacementPolicy from an xml element.
+   */
+  public static QueuePlacementPolicy fromXml(Element el,
+      Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+      throws AllocationConfigurationException {
+    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+    NodeList elements = el.getChildNodes();
+    for (int i = 0; i < elements.getLength(); i++) {
+      Node node = elements.item(i);
+      if (node instanceof Element) {
+        QueuePlacementRule rule = createAndInitializeRule(node);
+        rules.add(rule);
+      }
+    }
+    return new QueuePlacementPolicy(rules, configuredQueues, conf);
+  }
+  
+  /**
+   * Create and initialize a rule given a xml node
+   * @param node
+   * @return QueuePlacementPolicy
+   * @throws AllocationConfigurationException
+   */
+  public static QueuePlacementRule createAndInitializeRule(Node node)
+      throws AllocationConfigurationException {
+    Element element = (Element) node;
+
+    String ruleName = element.getAttribute("name");
+    if ("".equals(ruleName)) {
+      throw new AllocationConfigurationException("No name provided for a "
+          + "rule element");
+    }
+
+    Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
+    if (clazz == null) {
+      throw new AllocationConfigurationException("No rule class found for "
+          + ruleName);
+    }
+    QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
+    rule.initializeFromXml(element);
+    return rule;
+  }
+    
+  /**
+   * Build a simple queue placement policy from the allow-undeclared-pools and
+   * user-as-default-queue configuration options.
+   */
+  public static QueuePlacementPolicy fromConfiguration(Configuration conf,
+      Map<FSQueueType, Set<String>> configuredQueues) {
+    boolean create = conf.getBoolean(
+        FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
+        FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
+    boolean userAsDefaultQueue = conf.getBoolean(
+        FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
+        FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE);
+    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+    rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+    if (userAsDefaultQueue) {
+      rules.add(new QueuePlacementRule.User().initialize(create, null));
+    }
+    if (!userAsDefaultQueue || !create) {
+      rules.add(new QueuePlacementRule.Default().initialize(true, null));
+    }
+    try {
+      return new QueuePlacementPolicy(rules, configuredQueues, conf);
+    } catch (AllocationConfigurationException ex) {
+      throw new RuntimeException("Should never hit exception when loading" +
+          "placement policy from conf", ex);
+    }
+  }
+
+  /**
+   * Applies this rule to an app with the given requested queue and user/group
+   * information.
+   * 
+   * @param requestedQueue
+   *    The queue specified in the ApplicationSubmissionContext
+   * @param user
+   *    The user submitting the app
+   * @return
+   *    The name of the queue to assign the app to.  Or null if the app should
+   *    be rejected.
+   * @throws IOException
+   *    If an exception is encountered while getting the user's groups
+   */
+  public String assignAppToQueue(String requestedQueue, String user)
+      throws IOException {
+    for (QueuePlacementRule rule : rules) {
+      String queue = rule.assignAppToQueue(requestedQueue, user, groups,
+          configuredQueues);
+      if (queue == null || !queue.isEmpty()) {
+        return queue;
+      }
+    }
+    throw new IllegalStateException("Should have applied a rule before " +
+        "reaching here");
+  }
+  
+  public List<QueuePlacementRule> getRules() {
+    return rules;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
new file mode 100644
index 0000000..4be9f73
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+
+//YARNUTIL: VANILLA
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
+public abstract class QueuePlacementRule {
+  protected boolean create;
+  public static final Log LOG =
+      LogFactory.getLog(QueuePlacementRule.class.getName());
+
+  /**
+   * Initializes the rule with any arguments.
+   * 
+   * @param args
+   *    Additional attributes of the rule's xml element other than create.
+   */
+  public QueuePlacementRule initialize(boolean create, Map<String, String> args) {
+    this.create = create;
+    return this;
+  }
+  
+  /**
+   * 
+   * @param requestedQueue
+   *    The queue explicitly requested.
+   * @param user
+   *    The user submitting the app.
+   * @param groups
+   *    The groups of the user submitting the app.
+   * @param configuredQueues
+   *    The queues specified in the scheduler configuration.
+   * @return
+   *    The queue to place the app into. An empty string indicates that we should
+   *    continue to the next rule, and null indicates that the app should be rejected.
+   */
+  public String assignAppToQueue(String requestedQueue, String user,
+      Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+      throws IOException {
+   String queue = getQueueForApp(requestedQueue, user, groups,
+        configuredQueues);
+    if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue)
+        || configuredQueues.get(FSQueueType.PARENT).contains(queue)) {
+      return queue;
+    } else {
+      return "";
+    }
+  }
+  
+  public void initializeFromXml(Element el)
+      throws AllocationConfigurationException {
+    boolean create = true;
+    NamedNodeMap attributes = el.getAttributes();
+    Map<String, String> args = new HashMap<String, String>();
+    for (int i = 0; i < attributes.getLength(); i++) {
+      Node node = attributes.item(i);
+      String key = node.getNodeName();
+      String value = node.getNodeValue();
+      if (key.equals("create")) {
+        create = Boolean.parseBoolean(value);
+      } else {
+        args.put(key, value);
+      }
+    }
+    initialize(create, args);
+  }
+  
+  /**
+   * Returns true if this rule never tells the policy to continue.
+   */
+  public abstract boolean isTerminal();
+  
+  /**
+   * Applies this rule to an app with the given requested queue and user/group
+   * information.
+   * 
+   * @param requestedQueue
+   *    The queue specified in the ApplicationSubmissionContext
+   * @param user
+   *    The user submitting the app.
+   * @param groups
+   *    The groups of the user submitting the app.
+   * @return
+   *    The name of the queue to assign the app to, or null to empty string
+   *    continue to the next rule.
+   */
+  protected abstract String getQueueForApp(String requestedQueue, String user,
+      Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+      throws IOException;
+
+  /**
+   * Places apps in queues by username of the submitter
+   */
+  public static class User extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+      return "root." + cleanName(user);
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return create;
+    }
+  }
+  
+  /**
+   * Places apps in queues by primary group of the submitter
+   */
+  public static class PrimaryGroup extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+        throws IOException {
+      final List<String> groupList = groups.getGroups(user);
+      if (groupList.isEmpty()) {
+        throw new IOException("No groups returned for user " + user);
+      }
+      return "root." + cleanName(groupList.get(0));
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return create;
+    }
+  }
+  
+  /**
+   * Places apps in queues by secondary group of the submitter
+   * 
+   * Match will be made on first secondary group that exist in
+   * queues
+   */
+  public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+        throws IOException {
+      List<String> groupNames = groups.getGroups(user);
+      for (int i = 1; i < groupNames.size(); i++) {
+        String group = cleanName(groupNames.get(i));
+        if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
+            || configuredQueues.get(FSQueueType.PARENT).contains(
+                "root." + group)) {
+          return "root." + group;
+        }
+      }
+      
+      return "";
+    }
+        
+    @Override
+    public boolean isTerminal() {
+      return false;
+    }
+  }
+
+  /**
+   * Places apps in queues with name of the submitter under the queue
+   * returned by the nested rule.
+   */
+  public static class NestedUserQueue extends QueuePlacementRule {
+    @VisibleForTesting
+    QueuePlacementRule nestedRule;
+
+    /**
+     * Parse xml and instantiate the nested rule 
+     */
+    @Override
+    public void initializeFromXml(Element el)
+        throws AllocationConfigurationException {
+      NodeList elements = el.getChildNodes();
+
+      for (int i = 0; i < elements.getLength(); i++) {
+        Node node = elements.item(i);
+        if (node instanceof Element) {
+          Element element = (Element) node;
+          if ("rule".equals(element.getTagName())) {
+            QueuePlacementRule rule = QueuePlacementPolicy
+                .createAndInitializeRule(node);
+            if (rule == null) {
+              throw new AllocationConfigurationException(
+                  "Unable to create nested rule in nestedUserQueue rule");
+            }
+            this.nestedRule = rule;
+            break;
+          } else {
+            continue;
+          }
+        }
+      }
+
+      if (this.nestedRule == null) {
+        throw new AllocationConfigurationException(
+            "No nested rule specified in <nestedUserQueue> rule");
+      }
+      super.initializeFromXml(el);
+    }
+    
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+        throws IOException {
+      // Apply the nested rule
+      String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
+          groups, configuredQueues);
+      
+      if (queueName != null && queueName.length() != 0) {
+        if (!queueName.startsWith("root.")) {
+          queueName = "root." + queueName;
+        }
+        
+        // Verify if the queue returned by the nested rule is an configured leaf queue,
+        // if yes then skip to next rule in the queue placement policy
+        if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
+          return "";
+        }
+        return queueName + "." + cleanName(user);
+      }
+      return queueName;
+    }
+
+    @Override
+    public boolean isTerminal() {
+      return false;
+    }
+  }
+  
+  /**
+   * Places apps in queues by requested queue of the submitter
+   */
+  public static class Specified extends QueuePlacementRule {
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+      if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
+        return "";
+      } else {
+        if (!requestedQueue.startsWith("root.")) {
+          requestedQueue = "root." + requestedQueue;
+        }
+        return requestedQueue;
+      }
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return false;
+    }
+  }
+  
+  /**
+   * Places apps in the specified default queue. If no default queue is
+   * specified the app is placed in root.default queue.
+   */
+  public static class Default extends QueuePlacementRule {
+    @VisibleForTesting
+    String defaultQueueName;
+
+    @Override
+    public QueuePlacementRule initialize(boolean create,
+        Map<String, String> args) {
+      if (defaultQueueName == null) {
+        defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+      }
+      return super.initialize(create, args);
+    }
+    
+    @Override
+    public void initializeFromXml(Element el)
+        throws AllocationConfigurationException {
+      defaultQueueName = el.getAttribute("queue");
+      if (defaultQueueName != null && !defaultQueueName.isEmpty()) {
+        if (!defaultQueueName.startsWith("root.")) {
+          defaultQueueName = "root." + defaultQueueName;
+        }
+      } else {
+        defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+      }
+      super.initializeFromXml(el);
+    }
+
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+      return defaultQueueName;
+    }
+
+    @Override
+    public boolean isTerminal() {
+      return true;
+    }
+  }
+
+  /**
+   * Rejects all apps
+   */
+  public static class Reject extends QueuePlacementRule {
+    @Override
+    public String assignAppToQueue(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+      return null;
+    }
+    
+    @Override
+    protected String getQueueForApp(String requestedQueue, String user,
+        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public boolean isTerminal() {
+      return true;
+    }
+  }
+
+  /**
+   * Replace the periods in the username or groupname with "_dot_" and
+   * remove trailing and leading whitespace.
+   */
+  protected String cleanName(String name) {
+    name = name.trim();
+    if (name.contains(".")) {
+      String converted = name.replaceAll("\\.", "_dot_");
+      LOG.warn("Name " + name + " is converted to " + converted
+          + " when it is used as a queue name.");
+      return converted;
+    } else {
+      return name;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
new file mode 100644
index 0000000..0d19614
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: DUMMY IMPL
+
+public class SchedulingPolicy {
+  public static final SchedulingPolicy DEFAULT_POLICY = null;
+
+  public static SchedulingPolicy parse(String s) {
+    return DEFAULT_POLICY;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java
new file mode 100644
index 0000000..e62bc25
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.impala.yarn.server.utils;
+//YARNUTIL: MODIFIED
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+/**
+ * Builder utilities to construct various objects.
+ *
+ */
+public class BuilderUtils {
+
+  private static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  public static Resource newResource(int memory, int vCores) {
+    Resource resource = recordFactory.newRecordInstance(Resource.class);
+    resource.setMemory(memory);
+    resource.setVirtualCores(vCores);
+    return resource;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/fe/CMakeLists.txt b/fe/CMakeLists.txt
index eee1601..d5c7c1e 100644
--- a/fe/CMakeLists.txt
+++ b/fe/CMakeLists.txt
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-add_custom_target(fe ALL DEPENDS thrift-deps fb-deps function-registry ext-data-source
+add_custom_target(fe ALL DEPENDS
+  thrift-deps fb-deps yarn-extras function-registry ext-data-source
   COMMAND $ENV{IMPALA_HOME}/bin/mvn-quiet.sh install -DskipTests
 )

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index 0f7ae8c..a0330d7 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -47,6 +47,7 @@ under the License.
     <impala.extdatasrc.api.version>1.0-SNAPSHOT</impala.extdatasrc.api.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <kudu.version>${env.KUDU_JAVA_VERSION}</kudu.version>
+    <yarn-extras.version>${project.version}</yarn-extras.version>
     <eclipse.output.directory>eclipse-classes</eclipse.output.directory>
   </properties>
 
@@ -98,24 +99,18 @@ under the License.
     </dependency>
 
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-api</artifactId>
-      <version>${hadoop.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-      <version>${hadoop.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.sentry</groupId>
       <artifactId>sentry-core-common</artifactId>
       <version>${sentry.version}</version>
     </dependency>
 
     <dependency>
+      <groupId>org.apache.impala</groupId>
+      <artifactId>yarn-extras</artifactId>
+      <version>${yarn-extras.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.sentry</groupId>
       <artifactId>sentry-core-model-db</artifactId>
       <version>${sentry.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
index 9976623..5cea0e1 100644
--- a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
+++ b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
@@ -28,9 +28,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -49,6 +46,9 @@ import org.apache.impala.thrift.TResolveRequestPoolParams;
 import org.apache.impala.thrift.TResolveRequestPoolResult;
 import org.apache.impala.thrift.TStatus;
 import org.apache.impala.util.FileWatchService.FileChangeListener;
+import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
+import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
index e849a81..e84a9bc 100644
--- a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
+++ b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
@@ -22,16 +22,25 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URISyntaxException;
 
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
+
+import org.apache.impala.authorization.User;
 import org.apache.impala.common.ByteUnits;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TPoolConfig;
 import org.apache.impala.thrift.TResolveRequestPoolParams;
@@ -109,6 +118,18 @@ public class TestRequestPoolService {
     poolService_.start();
   }
 
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    RuntimeEnv.INSTANCE.setTestEnv(true);
+    User.setRulesForTesting(
+        new Configuration().get(HADOOP_SECURITY_AUTH_TO_LOCAL, "DEFAULT"));
+  }
+
+  @AfterClass
+  public static void cleanUpClass() {
+    RuntimeEnv.INSTANCE.reset();
+  }
+
   @After
   public void cleanUp() throws Exception {
     if (poolService_ != null) poolService_.stop();