You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/09/19 21:35:25 UTC
incubator-impala git commit: IMPALA-5920: Remove admission control
dependency on YARN RM jar
Repository: incubator-impala
Updated Branches:
refs/heads/master 1e63ff843 -> 8ad6d0331
IMPALA-5920: Remove admission control dependency on YARN RM jar
Impala's admission controller relies on the YARN
fair-scheduler.xml for configuration. That configuration is
loaded using YARN directly (ie. as a library by the
frontend). In Hadoop 3, a number of changes were made to the
YARN resourcemanager which break Impala. While we eventually
want to rethink the admission control configuration
(IMPALA-4159), in the meantime we at least should avoid
using unsupported YARN APIs.
This patch removes the fe dependency on the YARN artifact
'hadoop-yarn-server-resourcemanager' which contains private
APIs and isn't meant to be used as a library.
A subset of the required code has been added in
'common/yarn-extras', taken from Hadoop 2.6.0 in CDH, e.g.
see [1]. The code is added in packages 'org.apache.impala.*'
instead of 'org.apache.yarn.*'. Some code could be copied
as-is, those files are marked with the comment:
//YARNUTIL: VANILLA
Files that required some modifications are marked with:
//YARNUTIL: MODIFIED
Or, if all code except a dummy interface could be added:
//YARNUTIL: DUMMY IMPL
The goal the 'yarn-extras' is to make Impala's handling of
the AC configuration self-sufficient, i.e. it shouldn't
matter what version of Hadoop exists. As-is, this was tested
and found to work when Hadoop 2.6 is installed. Because
the yarn-extras/pom.xml still references hadoop-common,
hadoop-yarn-common, and hadoop-yarn-api, additional testing
will be required to ensure Impala using yarn-extras works
when installed along side Hadoop 3.
That testing for Hadoop 3 will be done later. Future changes
will make any other changes required for existing code to
work when Hadoop 3 is installed.
Testing:
* Ran existing tests on master.
1: https://www.cloudera.com/documentation/enterprise/release-notes/topics/cm_vd_cdh_package_tarball_512.html
Change-Id: I7efdd8ebea298836ca2a82c0a4ae037ac9285bcf
Reviewed-on: http://gerrit.cloudera.org:8080/8035
Reviewed-by: Matthew Jacobs <mj...@apache.org>
Tested-by: Matthew Jacobs <mj...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8ad6d033
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8ad6d033
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8ad6d033
Branch: refs/heads/master
Commit: 8ad6d03310825418b6aa1d427200b07bc8bdb0bc
Parents: 1e63ff8
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Mon Sep 11 10:53:07 2017 -0700
Committer: Matthew Jacobs <mj...@apache.org>
Committed: Tue Sep 19 21:32:07 2017 +0000
----------------------------------------------------------------------
CMakeLists.txt | 1 +
common/yarn-extras/CMakeLists.txt | 20 +
common/yarn-extras/README.txt | 5 +
common/yarn-extras/pom.xml | 118 ++++
.../resource/ResourceWeights.java | 26 +
.../scheduler/fair/AllocationConfiguration.java | 184 +++++++
.../fair/AllocationConfigurationException.java | 40 ++
.../fair/AllocationFileLoaderService.java | 536 +++++++++++++++++++
.../scheduler/fair/FSQueueType.java | 32 ++
.../fair/FairSchedulerConfiguration.java | 77 +++
.../scheduler/fair/QueuePlacementPolicy.java | 181 +++++++
.../scheduler/fair/QueuePlacementRule.java | 367 +++++++++++++
.../scheduler/fair/SchedulingPolicy.java | 28 +
.../impala/yarn/server/utils/BuilderUtils.java | 41 ++
fe/CMakeLists.txt | 3 +-
fe/pom.xml | 19 +-
.../apache/impala/util/RequestPoolService.java | 6 +-
.../impala/util/TestRequestPoolService.java | 23 +-
18 files changed, 1690 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e8a2355..d60487f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -346,6 +346,7 @@ include_directories(SYSTEM ${KUDU_CLIENT_INCLUDE_DIR})
add_subdirectory(common/function-registry)
add_subdirectory(common/thrift)
add_subdirectory(common/fbs)
+add_subdirectory(common/yarn-extras)
add_subdirectory(be)
add_subdirectory(fe)
add_subdirectory(ext-data-source)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/yarn-extras/CMakeLists.txt b/common/yarn-extras/CMakeLists.txt
new file mode 100644
index 0000000..81144c3
--- /dev/null
+++ b/common/yarn-extras/CMakeLists.txt
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_custom_target(yarn-extras ALL
+ COMMAND $ENV{IMPALA_HOME}/bin/mvn-quiet.sh install -DskipTests
+)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/README.txt
----------------------------------------------------------------------
diff --git a/common/yarn-extras/README.txt b/common/yarn-extras/README.txt
new file mode 100644
index 0000000..543c596
--- /dev/null
+++ b/common/yarn-extras/README.txt
@@ -0,0 +1,5 @@
+Extra Hadoop classes from Yarn needed by Impala.
+
+This is necessary because Impala has an admission controller that is configured using the
+same configuration as Yarn (i.e. a fair-scheduler.xml). Some Yarn classes are used to
+provide user to pool resolution, authorization, and accessing pool configurations.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/pom.xml
----------------------------------------------------------------------
diff --git a/common/yarn-extras/pom.xml b/common/yarn-extras/pom.xml
new file mode 100644
index 0000000..8f3ba4d
--- /dev/null
+++ b/common/yarn-extras/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.impala</groupId>
+ <artifactId>yarn-extras</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <name>YARN Extras</name>
+ <description>Extra Hadoop classes from YARN needed by Impala</description>
+ <packaging>jar</packaging>
+ <url>.</url>
+
+ <properties>
+ <hadoop.version>${env.IMPALA_HADOOP_VERSION}</hadoop.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+
+ <repository>
+ <id>cdh.rcs.releases.repo</id>
+ <url>https://repository.cloudera.com/content/groups/cdh-releases-rcs</url>
+ <name>CDH Releases Repository</name>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+
+ <repository>
+ <id>cdh.snapshots.repo</id>
+ <url>https://repository.cloudera.com/content/repositories/snapshots</url>
+ <name>CDH Snapshots Repository</name>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+
+ <repository>
+ <id>cdh.repo</id>
+ <url>https://repository.cloudera.com/content/groups/cloudera-repos</url>
+ <name>Cloudera Repositories</name>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+
+ <repository>
+ <id>Codehaus repository</id>
+ <url>http://repository.codehaus.org/</url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java
new file mode 100644
index 0000000..b4a8d67
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.impala.yarn.server.resourcemanager.resource;
+//YARNUTIL: DUMMY IMPL
+
+public class ResourceWeights {
+
+ public ResourceWeights(float f) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
new file mode 100644
index 0000000..f304c6d
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: MODIFIED
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.impala.yarn.server.resourcemanager.resource.ResourceWeights;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AllocationConfiguration {
+ private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
+ private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
+ private static final ResourceCalculator RESOURCE_CALCULATOR =
+ new DefaultResourceCalculator();
+ // Minimum resource allocation for each queue
+ private final Map<String, Resource> minQueueResources;
+ // Maximum amount of resources per queue
+ @VisibleForTesting
+ final Map<String, Resource> maxQueueResources;
+
+ private final Resource queueMaxResourcesDefault;
+
+ // ACL's for each queue. Only specifies non-default ACL's from configuration.
+ private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
+
+ // Policy for mapping apps to queues
+ @VisibleForTesting
+ QueuePlacementPolicy placementPolicy;
+
+ //Configured queues in the alloc xml
+ @VisibleForTesting
+ Map<FSQueueType, Set<String>> configuredQueues;
+
+ public AllocationConfiguration(Map<String, Resource> minQueueResources,
+ Map<String, Resource> maxQueueResources,
+ Map<String, Resource> maxChildQueueResources,
+ Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
+ Map<String, ResourceWeights> queueWeights,
+ Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
+ int queueMaxAppsDefault, Resource queueMaxResourcesDefault,
+ float queueMaxAMShareDefault,
+ Map<String, SchedulingPolicy> schedulingPolicies,
+ SchedulingPolicy defaultSchedulingPolicy,
+ Map<String, Long> minSharePreemptionTimeouts,
+ Map<String, Long> fairSharePreemptionTimeouts,
+ Map<String, Float> fairSharePreemptionThresholds,
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls,
+ QueuePlacementPolicy placementPolicy,
+ Map<FSQueueType, Set<String>> configuredQueues,
+ Set<String> nonPreemptableQueues) {
+ this.minQueueResources = minQueueResources;
+ this.maxQueueResources = maxQueueResources;
+ this.queueMaxResourcesDefault = queueMaxResourcesDefault;
+ this.queueAcls = queueAcls;
+ this.placementPolicy = placementPolicy;
+ this.configuredQueues = configuredQueues;
+ }
+
+ public AllocationConfiguration(Configuration conf) {
+ minQueueResources = new HashMap<>();
+ maxQueueResources = new HashMap<>();
+ queueMaxResourcesDefault = Resources.unbounded();
+ queueAcls = new HashMap<>();
+ configuredQueues = new HashMap<>();
+ for (FSQueueType queueType : FSQueueType.values()) {
+ configuredQueues.put(queueType, new HashSet<String>());
+ }
+ placementPolicy =
+ QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
+ }
+
+ /**
+ * Get the ACLs associated with this queue. If a given ACL is not explicitly
+ * configured, include the default value for that ACL. The default for the
+ * root queue is everybody ("*") and the default for all other queues is
+ * nobody ("")
+ */
+ public AccessControlList getQueueAcl(String queue, QueueACL operation) {
+ Map<QueueACL, AccessControlList> queueAcls = this.queueAcls.get(queue);
+ if (queueAcls != null) {
+ AccessControlList operationAcl = queueAcls.get(operation);
+ if (operationAcl != null) {
+ return operationAcl;
+ }
+ }
+ return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL;
+ }
+
+ /**
+ * Get the minimum resource allocation for the given queue.
+ *
+ * @param queue the target queue's name
+ * @return the min allocation on this queue or {@link Resources#none}
+ * if not set
+ */
+ public Resource getMinResources(String queue) {
+ Resource minQueueResource = minQueueResources.get(queue);
+ return (minQueueResource == null) ? Resources.none() : minQueueResource;
+ }
+
+ /**
+ * Set the maximum resource allocation for the given queue.
+ *
+ * @param queue the target queue
+ * @param maxResource the maximum resource allocation
+ */
+ void setMaxResources(String queue, Resource maxResource) {
+ maxQueueResources.put(queue, maxResource);
+ }
+
+ /**
+ * Get the maximum resource allocation for the given queue. If the max in not
+ * set, return the larger of the min and the default max.
+ *
+ * @param queue the target queue's name
+ * @return the max allocation on this queue
+ */
+ public Resource getMaxResources(String queue) {
+ Resource maxQueueResource = maxQueueResources.get(queue);
+ if (maxQueueResource == null) {
+ Resource minQueueResource = minQueueResources.get(queue);
+ if (minQueueResource != null &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, Resources.unbounded(),
+ minQueueResource, queueMaxResourcesDefault)) {
+ return minQueueResource;
+ } else {
+ return queueMaxResourcesDefault;
+ }
+ } else {
+ return maxQueueResource;
+ }
+ }
+
+ public boolean hasAccess(String queueName, QueueACL acl,
+ UserGroupInformation user) {
+ int lastPeriodIndex = queueName.length();
+ while (lastPeriodIndex != -1) {
+ String queue = queueName.substring(0, lastPeriodIndex);
+ if (getQueueAcl(queue, acl).isUserAllowed(user)) {
+ return true;
+ }
+
+ lastPeriodIndex = queueName.lastIndexOf('.', lastPeriodIndex - 1);
+ }
+
+ return false;
+ }
+
+ public Map<FSQueueType, Set<String>> getConfiguredQueues() {
+ return configuredQueues;
+ }
+
+ public QueuePlacementPolicy getPlacementPolicy() {
+ return placementPolicy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
new file mode 100644
index 0000000..1f848fb
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: VANILLA
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Thrown when the allocation file for {@link QueueManager} is malformed.
+ */
+@Private
+@Unstable
+public class AllocationConfigurationException extends Exception {
+ private static final long serialVersionUID = 4046517047810854249L;
+
+ public AllocationConfigurationException(String message) {
+ super(message);
+ }
+
+ public AllocationConfigurationException(String message, Throwable t) {
+ super(message, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
new file mode 100644
index 0000000..ed9943a
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -0,0 +1,536 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: VANILLA
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.impala.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.annotations.VisibleForTesting;
+
+@Public
+@Unstable
+public class AllocationFileLoaderService extends AbstractService {
+
+ public static final Log LOG = LogFactory.getLog(
+ AllocationFileLoaderService.class.getName());
+
+ /** Time to wait between checks of the allocation file */
+ public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000;
+
+ /**
+ * Time to wait after the allocation has been modified before reloading it
+ * (this is done to prevent loading a file that hasn't been fully written).
+ */
+ public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
+
+ public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
+
+ private final Clock clock;
+
+ private long lastSuccessfulReload; // Last time we successfully reloaded queues
+ private boolean lastReloadAttemptFailed = false;
+
+ // Path to XML file containing allocations.
+ private File allocFile;
+
+ private Listener reloadListener;
+
+ @VisibleForTesting
+ long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
+
+ private Thread reloadThread;
+ private volatile boolean running = true;
+
+ public AllocationFileLoaderService() {
+ this(new SystemClock());
+ }
+
+ public AllocationFileLoaderService(Clock clock) {
+ super(AllocationFileLoaderService.class.getName());
+ this.clock = clock;
+
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ this.allocFile = getAllocationFile(conf);
+ if (allocFile != null) {
+ reloadThread = new Thread() {
+ @Override
+ public void run() {
+ while (running) {
+ long time = clock.getTime();
+ long lastModified = allocFile.lastModified();
+ if (lastModified > lastSuccessfulReload &&
+ time > lastModified + ALLOC_RELOAD_WAIT_MS) {
+ try {
+ reloadAllocations();
+ } catch (Exception ex) {
+ if (!lastReloadAttemptFailed) {
+ LOG.error("Failed to reload fair scheduler config file - " +
+ "will use existing allocations.", ex);
+ }
+ lastReloadAttemptFailed = true;
+ }
+ } else if (lastModified == 0l) {
+ if (!lastReloadAttemptFailed) {
+ LOG.warn("Failed to reload fair scheduler config file because" +
+ " last modified returned 0. File exists: "
+ + allocFile.exists());
+ }
+ lastReloadAttemptFailed = true;
+ }
+ try {
+ Thread.sleep(reloadIntervalMs);
+ } catch (InterruptedException ex) {
+ LOG.info(
+ "Interrupted while waiting to reload alloc configuration");
+ }
+ }
+ }
+ };
+ reloadThread.setName("AllocationFileReloader");
+ reloadThread.setDaemon(true);
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ if (reloadThread != null) {
+ reloadThread.start();
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ running = false;
+ if (reloadThread != null) {
+ reloadThread.interrupt();
+ try {
+ reloadThread.join(THREAD_JOIN_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ LOG.warn("reloadThread fails to join.");
+ }
+ }
+ super.serviceStop();
+ }
+
+ /**
+ * Path to XML file containing allocations. If the
+ * path is relative, it is searched for in the
+ * classpath, but loaded like a regular File.
+ */
+ public File getAllocationFile(Configuration conf) {
+ String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE,
+ FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE);
+ File allocFile = new File(allocFilePath);
+ if (!allocFile.isAbsolute()) {
+ URL url = Thread.currentThread().getContextClassLoader()
+ .getResource(allocFilePath);
+ if (url == null) {
+ LOG.warn(allocFilePath + " not found on the classpath.");
+ allocFile = null;
+ } else if (!url.getProtocol().equalsIgnoreCase("file")) {
+ throw new RuntimeException("Allocation file " + url
+ + " found on the classpath is not on the local filesystem.");
+ } else {
+ allocFile = new File(url.getPath());
+ }
+ }
+ return allocFile;
+ }
+
+ public synchronized void setReloadListener(Listener reloadListener) {
+ this.reloadListener = reloadListener;
+ }
+
+ /**
+ * Updates the allocation list from the allocation config file. This file is
+ * expected to be in the XML format specified in the design doc.
+ *
+ * @throws IOException if the config file cannot be read.
+ * @throws AllocationConfigurationException if allocations are invalid.
+ * @throws ParserConfigurationException if XML parser is misconfigured.
+ * @throws SAXException if config file is malformed.
+ */
+ public synchronized void reloadAllocations() throws IOException,
+ ParserConfigurationException, SAXException,
+ AllocationConfigurationException {
+ if (allocFile == null) {
+ return;
+ }
+ LOG.info("Loading allocation file " + allocFile);
+ // Create some temporary hashmaps to hold the new allocs, and we only save
+ // them in our fields if we have parsed the entire allocs file successfully.
+ Map<String, Resource> minQueueResources = new HashMap<>();
+ Map<String, Resource> maxQueueResources = new HashMap<>();
+ Map<String, Resource> maxChildQueueResources = new HashMap<>();
+ Map<String, Integer> queueMaxApps = new HashMap<>();
+ Map<String, Integer> userMaxApps = new HashMap<>();
+ Map<String, Float> queueMaxAMShares = new HashMap<>();
+ Map<String, ResourceWeights> queueWeights = new HashMap<>();
+ Map<String, SchedulingPolicy> queuePolicies = new HashMap<>();
+ Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
+ Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
+ Map<String, Float> fairSharePreemptionThresholds = new HashMap<>();
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
+ Set<String> nonPreemptableQueues = new HashSet<>();
+ int userMaxAppsDefault = Integer.MAX_VALUE;
+ int queueMaxAppsDefault = Integer.MAX_VALUE;
+ Resource queueMaxResourcesDefault = Resources.unbounded();
+ float queueMaxAMShareDefault = 0.5f;
+ long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
+ long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+ float defaultFairSharePreemptionThreshold = 0.5f;
+ SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
+
+ QueuePlacementPolicy newPlacementPolicy = null;
+
+ // Remember all queue names so we can display them on web UI, etc.
+ // configuredQueues is segregated based on whether it is a leaf queue
+ // or a parent queue. This information is used for creating queues
+ // and also for making queue placement decisions(QueuePlacementRule.java).
+ Map<FSQueueType, Set<String>> configuredQueues = new HashMap<>();
+
+ for (FSQueueType queueType : FSQueueType.values()) {
+ configuredQueues.put(queueType, new HashSet<String>());
+ }
+
+ // Read and parse the allocations file.
+ DocumentBuilderFactory docBuilderFactory =
+ DocumentBuilderFactory.newInstance();
+ docBuilderFactory.setIgnoringComments(true);
+ DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+ Document doc = builder.parse(allocFile);
+ Element root = doc.getDocumentElement();
+ if (!"allocations".equals(root.getTagName()))
+ throw new AllocationConfigurationException("Bad fair scheduler config " +
+ "file: top-level element not <allocations>");
+ NodeList elements = root.getChildNodes();
+ List<Element> queueElements = new ArrayList<Element>();
+ Element placementPolicyElement = null;
+ for (int i = 0; i < elements.getLength(); i++) {
+ Node node = elements.item(i);
+ if (node instanceof Element) {
+ Element element = (Element)node;
+ if ("queue".equals(element.getTagName()) ||
+ "pool".equals(element.getTagName())) {
+ queueElements.add(element);
+ } else if ("user".equals(element.getTagName())) {
+ String userName = element.getAttribute("name");
+ NodeList fields = element.getChildNodes();
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element))
+ continue;
+ Element field = (Element) fieldNode;
+ if ("maxRunningApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ userMaxApps.put(userName, val);
+ }
+ }
+ } else if ("queueMaxResourcesDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ Resource val =
+ FairSchedulerConfiguration.parseResourceConfigValue(text);
+ queueMaxResourcesDefault = val;
+ } else if ("userMaxAppsDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ userMaxAppsDefault = val;
+ } else if ("defaultFairSharePreemptionTimeout"
+ .equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ defaultFairSharePreemptionTimeout = val;
+ } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
+ if (defaultFairSharePreemptionTimeout == Long.MAX_VALUE) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ defaultFairSharePreemptionTimeout = val;
+ }
+ } else if ("defaultMinSharePreemptionTimeout"
+ .equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ defaultMinSharePreemptionTimeout = val;
+ } else if ("defaultFairSharePreemptionThreshold"
+ .equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ float val = Float.parseFloat(text);
+ val = Math.max(Math.min(val, 1.0f), 0.0f);
+ defaultFairSharePreemptionThreshold = val;
+ } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ queueMaxAppsDefault = val;
+ } else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ float val = Float.parseFloat(text);
+ val = Math.min(val, 1.0f);
+ queueMaxAMShareDefault = val;
+ } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
+ || "defaultQueueSchedulingMode".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ if (text.equalsIgnoreCase("FIFO")) {
+ throw new AllocationConfigurationException("Bad fair scheduler "
+ + "config file: defaultQueueSchedulingPolicy or "
+ + "defaultQueueSchedulingMode can't be FIFO.");
+ }
+ defaultSchedPolicy = SchedulingPolicy.parse(text);
+ } else if ("queuePlacementPolicy".equals(element.getTagName())) {
+ placementPolicyElement = element;
+ } else {
+ LOG.warn("Bad element in allocations file: " + element.getTagName());
+ }
+ }
+ }
+
+ // Load queue elements. A root queue can either be included or omitted. If
+ // it's included, all other queues must be inside it.
+ for (Element element : queueElements) {
+ String parent = "root";
+ if (element.getAttribute("name").equalsIgnoreCase("root")) {
+ if (queueElements.size() > 1) {
+ throw new AllocationConfigurationException("If configuring root queue,"
+ + " no other queues can be placed alongside it.");
+ }
+ parent = null;
+ }
+ loadQueue(parent, element, minQueueResources, maxQueueResources,
+ maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
+ queueWeights, queuePolicies, minSharePreemptionTimeouts,
+ fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
+ queueAcls, configuredQueues, nonPreemptableQueues);
+ }
+
+ // Load placement policy and pass it configured queues
+ Configuration conf = getConfig();
+ if (placementPolicyElement != null) {
+ newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
+ configuredQueues, conf);
+ } else {
+ newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
+ configuredQueues);
+ }
+
+ // Set the min/fair share preemption timeout for the root queue
+ if (!minSharePreemptionTimeouts.containsKey("root")){
+ minSharePreemptionTimeouts.put("root",
+ defaultMinSharePreemptionTimeout);
+ }
+ if (!fairSharePreemptionTimeouts.containsKey("root")) {
+ fairSharePreemptionTimeouts.put("root",
+ defaultFairSharePreemptionTimeout);
+ }
+
+ // Set the fair share preemption threshold for the root queue
+ if (!fairSharePreemptionThresholds.containsKey("root")) {
+ fairSharePreemptionThresholds.put("root",
+ defaultFairSharePreemptionThreshold);
+ }
+
+ AllocationConfiguration info =
+ new AllocationConfiguration(minQueueResources, maxQueueResources,
+ maxChildQueueResources, queueMaxApps, userMaxApps, queueWeights,
+ queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
+ queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
+ defaultSchedPolicy, minSharePreemptionTimeouts,
+ fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
+ newPlacementPolicy, configuredQueues, nonPreemptableQueues);
+ lastSuccessfulReload = clock.getTime();
+ lastReloadAttemptFailed = false;
+
+ reloadListener.onReload(info);
+ }
+
+ /**
+ * Loads a queue from a queue element in the configuration file
+ */
+ private void loadQueue(String parentName, Element element,
+ Map<String, Resource> minQueueResources,
+ Map<String, Resource> maxQueueResources,
+ Map<String, Resource> maxChildQueueResources,
+ Map<String, Integer> queueMaxApps,
+ Map<String, Integer> userMaxApps,
+ Map<String, Float> queueMaxAMShares,
+ Map<String, ResourceWeights> queueWeights,
+ Map<String, SchedulingPolicy> queuePolicies,
+ Map<String, Long> minSharePreemptionTimeouts,
+ Map<String, Long> fairSharePreemptionTimeouts,
+ Map<String, Float> fairSharePreemptionThresholds,
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls,
+ Map<FSQueueType, Set<String>> configuredQueues,
+ Set<String> nonPreemptableQueues)
+ throws AllocationConfigurationException {
+ String queueName = CharMatcher.WHITESPACE.trimFrom(
+ element.getAttribute("name"));
+
+ if (queueName.contains(".")) {
+ throw new AllocationConfigurationException("Bad fair scheduler config "
+ + "file: queue name (" + queueName + ") shouldn't contain period.");
+ }
+
+ if (queueName.isEmpty()) {
+ throw new AllocationConfigurationException("Bad fair scheduler config "
+ + "file: queue name shouldn't be empty or "
+ + "consist only of whitespace.");
+ }
+
+ if (parentName != null) {
+ queueName = parentName + "." + queueName;
+ }
+
+ Map<QueueACL, AccessControlList> acls = new HashMap<>();
+ NodeList fields = element.getChildNodes();
+ boolean isLeaf = true;
+
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element))
+ continue;
+ Element field = (Element) fieldNode;
+ if ("minResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ Resource val =
+ FairSchedulerConfiguration.parseResourceConfigValue(text);
+ minQueueResources.put(queueName, val);
+ } else if ("maxResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ Resource val =
+ FairSchedulerConfiguration.parseResourceConfigValue(text);
+ maxQueueResources.put(queueName, val);
+ } else if ("maxChildResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ Resource val =
+ FairSchedulerConfiguration.parseResourceConfigValue(text);
+ maxChildQueueResources.put(queueName, val);
+ } else if ("maxRunningApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ queueMaxApps.put(queueName, val);
+ } else if ("maxAMShare".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ float val = Float.parseFloat(text);
+ val = Math.min(val, 1.0f);
+ queueMaxAMShares.put(queueName, val);
+ } else if ("weight".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ double val = Double.parseDouble(text);
+ queueWeights.put(queueName, new ResourceWeights((float)val));
+ } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ minSharePreemptionTimeouts.put(queueName, val);
+ } else if ("fairSharePreemptionTimeout".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ fairSharePreemptionTimeouts.put(queueName, val);
+ } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ float val = Float.parseFloat(text);
+ val = Math.max(Math.min(val, 1.0f), 0.0f);
+ fairSharePreemptionThresholds.put(queueName, val);
+ } else if ("schedulingPolicy".equals(field.getTagName())
+ || "schedulingMode".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ SchedulingPolicy policy = SchedulingPolicy.parse(text);
+ queuePolicies.put(queueName, policy);
+ } else if ("aclSubmitApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData();
+ acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
+ } else if ("aclAdministerApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData();
+ acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
+ } else if ("allowPreemptionFrom".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ if (!Boolean.parseBoolean(text)) {
+ nonPreemptableQueues.add(queueName);
+ }
+ } else if ("queue".endsWith(field.getTagName()) ||
+ "pool".equals(field.getTagName())) {
+ loadQueue(queueName, field, minQueueResources, maxQueueResources,
+ maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
+ queueWeights, queuePolicies, minSharePreemptionTimeouts,
+ fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
+ queueAcls, configuredQueues, nonPreemptableQueues);
+ configuredQueues.get(FSQueueType.PARENT).add(queueName);
+ isLeaf = false;
+ }
+ }
+ if (isLeaf) {
+ // if a leaf in the alloc file is marked as type='parent'
+ // then store it under 'parent'
+ if ("parent".equals(element.getAttribute("type"))) {
+ configuredQueues.get(FSQueueType.PARENT).add(queueName);
+ } else {
+ configuredQueues.get(FSQueueType.LEAF).add(queueName);
+ }
+ }
+ queueAcls.put(queueName, acls);
+ if (maxQueueResources.containsKey(queueName) &&
+ minQueueResources.containsKey(queueName)
+ && !Resources.fitsIn(minQueueResources.get(queueName),
+ maxQueueResources.get(queueName))) {
+ LOG.warn(
+ String.format("Queue %s has max resources %s less than "
+ + "min resources %s", queueName, maxQueueResources.get(queueName),
+ minQueueResources.get(queueName)));
+ }
+ }
+
+ public interface Listener {
+ public void onReload(AllocationConfiguration info);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java
new file mode 100644
index 0000000..ba8912d
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: VANILLA
+
+public enum FSQueueType {
+ /*
+ * Represents a leaf queue
+ */
+ LEAF,
+
+ /*
+ * Represents a parent queue
+ */
+ PARENT
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
new file mode 100644
index 0000000..572ef74
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: MODIFIED
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.impala.yarn.server.utils.BuilderUtils;
+
+public class FairSchedulerConfiguration extends Configuration {
+
+ private static final String CONF_PREFIX = "yarn.scheduler.fair.";
+
+ public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
+ protected static final String DEFAULT_ALLOCATION_FILE = "fair-scheduler.xml";
+
+ /** Whether pools can be created that were not specified in the FS configuration file
+ */
+ protected static final String ALLOW_UNDECLARED_POOLS = CONF_PREFIX + "allow-undeclared-pools";
+ protected static final boolean DEFAULT_ALLOW_UNDECLARED_POOLS = true;
+
+ /**
+ * Whether to use the user name as the queue name (instead of "default") if
+ * the request does not specify a queue.
+ */
+ protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
+ protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
+
+ /**
+ * Parses a resource config value of a form like "1024", "1024 mb",
+ * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.
+ *
+ * @throws AllocationConfigurationException
+ */
+ public static Resource parseResourceConfigValue(String val)
+ throws AllocationConfigurationException {
+ try {
+ val = val.toLowerCase();
+ int memory = findResource(val, "mb");
+ int vcores = findResource(val, "vcores");
+ return BuilderUtils.newResource(memory, vcores);
+ } catch (AllocationConfigurationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new AllocationConfigurationException(
+ "Error reading resource config", ex);
+ }
+ }
+
+ private static int findResource(String val, String units)
+ throws AllocationConfigurationException {
+ Pattern pattern = Pattern.compile("(\\d+)(\\.\\d*)?\\s*" + units);
+ Matcher matcher = pattern.matcher(val);
+ if (!matcher.find()) {
+ throw new AllocationConfigurationException("Missing resource: " + units);
+ }
+ return Integer.parseInt(matcher.group(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
new file mode 100644
index 0000000..74c690c
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: VANILLA
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+@Private
+@Unstable
+public class QueuePlacementPolicy {
+ private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses;
+ static {
+ Map<String, Class<? extends QueuePlacementRule>> map =
+ new HashMap<String, Class<? extends QueuePlacementRule>>();
+ map.put("user", QueuePlacementRule.User.class);
+ map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
+ map.put("secondaryGroupExistingQueue",
+ QueuePlacementRule.SecondaryGroupExistingQueue.class);
+ map.put("specified", QueuePlacementRule.Specified.class);
+ map.put("nestedUserQueue",
+ QueuePlacementRule.NestedUserQueue.class);
+ map.put("default", QueuePlacementRule.Default.class);
+ map.put("reject", QueuePlacementRule.Reject.class);
+ ruleClasses = Collections.unmodifiableMap(map);
+ }
+
+ private final List<QueuePlacementRule> rules;
+ private final Map<FSQueueType, Set<String>> configuredQueues;
+ private final Groups groups;
+
+ public QueuePlacementPolicy(List<QueuePlacementRule> rules,
+ Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+ throws AllocationConfigurationException {
+ for (int i = 0; i < rules.size()-1; i++) {
+ if (rules.get(i).isTerminal()) {
+ throw new AllocationConfigurationException("Rules after rule "
+ + i + " in queue placement policy can never be reached");
+ }
+ }
+ if (!rules.get(rules.size()-1).isTerminal()) {
+ throw new AllocationConfigurationException(
+ "Could get past last queue placement rule without assigning");
+ }
+ this.rules = rules;
+ this.configuredQueues = configuredQueues;
+ groups = new Groups(conf);
+ }
+
+ /**
+ * Builds a QueuePlacementPolicy from an xml element.
+ */
+ public static QueuePlacementPolicy fromXml(Element el,
+ Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+ throws AllocationConfigurationException {
+ List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+ NodeList elements = el.getChildNodes();
+ for (int i = 0; i < elements.getLength(); i++) {
+ Node node = elements.item(i);
+ if (node instanceof Element) {
+ QueuePlacementRule rule = createAndInitializeRule(node);
+ rules.add(rule);
+ }
+ }
+ return new QueuePlacementPolicy(rules, configuredQueues, conf);
+ }
+
+ /**
+ * Create and initialize a rule given a xml node
+ * @param node
+ * @return QueuePlacementPolicy
+ * @throws AllocationConfigurationException
+ */
+ public static QueuePlacementRule createAndInitializeRule(Node node)
+ throws AllocationConfigurationException {
+ Element element = (Element) node;
+
+ String ruleName = element.getAttribute("name");
+ if ("".equals(ruleName)) {
+ throw new AllocationConfigurationException("No name provided for a "
+ + "rule element");
+ }
+
+ Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
+ if (clazz == null) {
+ throw new AllocationConfigurationException("No rule class found for "
+ + ruleName);
+ }
+ QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
+ rule.initializeFromXml(element);
+ return rule;
+ }
+
+ /**
+ * Build a simple queue placement policy from the allow-undeclared-pools and
+ * user-as-default-queue configuration options.
+ */
+ public static QueuePlacementPolicy fromConfiguration(Configuration conf,
+ Map<FSQueueType, Set<String>> configuredQueues) {
+ boolean create = conf.getBoolean(
+ FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
+ FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
+ boolean userAsDefaultQueue = conf.getBoolean(
+ FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
+ FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE);
+ List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+ rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+ if (userAsDefaultQueue) {
+ rules.add(new QueuePlacementRule.User().initialize(create, null));
+ }
+ if (!userAsDefaultQueue || !create) {
+ rules.add(new QueuePlacementRule.Default().initialize(true, null));
+ }
+ try {
+ return new QueuePlacementPolicy(rules, configuredQueues, conf);
+ } catch (AllocationConfigurationException ex) {
+ throw new RuntimeException("Should never hit exception when loading" +
+ "placement policy from conf", ex);
+ }
+ }
+
+ /**
+ * Applies this rule to an app with the given requested queue and user/group
+ * information.
+ *
+ * @param requestedQueue
+ * The queue specified in the ApplicationSubmissionContext
+ * @param user
+ * The user submitting the app
+ * @return
+ * The name of the queue to assign the app to. Or null if the app should
+ * be rejected.
+ * @throws IOException
+ * If an exception is encountered while getting the user's groups
+ */
+ public String assignAppToQueue(String requestedQueue, String user)
+ throws IOException {
+ for (QueuePlacementRule rule : rules) {
+ String queue = rule.assignAppToQueue(requestedQueue, user, groups,
+ configuredQueues);
+ if (queue == null || !queue.isEmpty()) {
+ return queue;
+ }
+ }
+ throw new IllegalStateException("Should have applied a rule before " +
+ "reaching here");
+ }
+
+ public List<QueuePlacementRule> getRules() {
+ return rules;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
new file mode 100644
index 0000000..4be9f73
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+
+//YARNUTIL: VANILLA
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.w3c.dom.Element;
+import org.w3c.dom.NamedNodeMap;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
+public abstract class QueuePlacementRule {
+ protected boolean create;
+ public static final Log LOG =
+ LogFactory.getLog(QueuePlacementRule.class.getName());
+
+ /**
+ * Initializes the rule with any arguments.
+ *
+ * @param args
+ * Additional attributes of the rule's xml element other than create.
+ */
+ public QueuePlacementRule initialize(boolean create, Map<String, String> args) {
+ this.create = create;
+ return this;
+ }
+
+ /**
+ *
+ * @param requestedQueue
+ * The queue explicitly requested.
+ * @param user
+ * The user submitting the app.
+ * @param groups
+ * The groups of the user submitting the app.
+ * @param configuredQueues
+ * The queues specified in the scheduler configuration.
+ * @return
+ * The queue to place the app into. An empty string indicates that we should
+ * continue to the next rule, and null indicates that the app should be rejected.
+ */
+ public String assignAppToQueue(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
+ String queue = getQueueForApp(requestedQueue, user, groups,
+ configuredQueues);
+ if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue)
+ || configuredQueues.get(FSQueueType.PARENT).contains(queue)) {
+ return queue;
+ } else {
+ return "";
+ }
+ }
+
+ public void initializeFromXml(Element el)
+ throws AllocationConfigurationException {
+ boolean create = true;
+ NamedNodeMap attributes = el.getAttributes();
+ Map<String, String> args = new HashMap<String, String>();
+ for (int i = 0; i < attributes.getLength(); i++) {
+ Node node = attributes.item(i);
+ String key = node.getNodeName();
+ String value = node.getNodeValue();
+ if (key.equals("create")) {
+ create = Boolean.parseBoolean(value);
+ } else {
+ args.put(key, value);
+ }
+ }
+ initialize(create, args);
+ }
+
+ /**
+ * Returns true if this rule never tells the policy to continue.
+ */
+ public abstract boolean isTerminal();
+
+ /**
+ * Applies this rule to an app with the given requested queue and user/group
+ * information.
+ *
+ * @param requestedQueue
+ * The queue specified in the ApplicationSubmissionContext
+ * @param user
+ * The user submitting the app.
+ * @param groups
+ * The groups of the user submitting the app.
+ * @return
+ * The name of the queue to assign the app to, or null to empty string
+ * continue to the next rule.
+ */
+ protected abstract String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException;
+
+ /**
+ * Places apps in queues by username of the submitter
+ */
+ public static class User extends QueuePlacementRule {
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+ return "root." + cleanName(user);
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return create;
+ }
+ }
+
+ /**
+ * Places apps in queues by primary group of the submitter
+ */
+ public static class PrimaryGroup extends QueuePlacementRule {
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
+ final List<String> groupList = groups.getGroups(user);
+ if (groupList.isEmpty()) {
+ throw new IOException("No groups returned for user " + user);
+ }
+ return "root." + cleanName(groupList.get(0));
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return create;
+ }
+ }
+
+ /**
+ * Places apps in queues by secondary group of the submitter
+ *
+ * Match will be made on first secondary group that exist in
+ * queues
+ */
+ public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
+ List<String> groupNames = groups.getGroups(user);
+ for (int i = 1; i < groupNames.size(); i++) {
+ String group = cleanName(groupNames.get(i));
+ if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
+ || configuredQueues.get(FSQueueType.PARENT).contains(
+ "root." + group)) {
+ return "root." + group;
+ }
+ }
+
+ return "";
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
+ * Places apps in queues with name of the submitter under the queue
+ * returned by the nested rule.
+ */
+ public static class NestedUserQueue extends QueuePlacementRule {
+ @VisibleForTesting
+ QueuePlacementRule nestedRule;
+
+ /**
+ * Parse xml and instantiate the nested rule
+ */
+ @Override
+ public void initializeFromXml(Element el)
+ throws AllocationConfigurationException {
+ NodeList elements = el.getChildNodes();
+
+ for (int i = 0; i < elements.getLength(); i++) {
+ Node node = elements.item(i);
+ if (node instanceof Element) {
+ Element element = (Element) node;
+ if ("rule".equals(element.getTagName())) {
+ QueuePlacementRule rule = QueuePlacementPolicy
+ .createAndInitializeRule(node);
+ if (rule == null) {
+ throw new AllocationConfigurationException(
+ "Unable to create nested rule in nestedUserQueue rule");
+ }
+ this.nestedRule = rule;
+ break;
+ } else {
+ continue;
+ }
+ }
+ }
+
+ if (this.nestedRule == null) {
+ throw new AllocationConfigurationException(
+ "No nested rule specified in <nestedUserQueue> rule");
+ }
+ super.initializeFromXml(el);
+ }
+
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
+ // Apply the nested rule
+ String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
+ groups, configuredQueues);
+
+ if (queueName != null && queueName.length() != 0) {
+ if (!queueName.startsWith("root.")) {
+ queueName = "root." + queueName;
+ }
+
+ // Verify if the queue returned by the nested rule is an configured leaf queue,
+ // if yes then skip to next rule in the queue placement policy
+ if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
+ return "";
+ }
+ return queueName + "." + cleanName(user);
+ }
+ return queueName;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
+ * Places apps in queues by requested queue of the submitter
+ */
+ public static class Specified extends QueuePlacementRule {
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+ if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
+ return "";
+ } else {
+ if (!requestedQueue.startsWith("root.")) {
+ requestedQueue = "root." + requestedQueue;
+ }
+ return requestedQueue;
+ }
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
+ * Places apps in the specified default queue. If no default queue is
+ * specified the app is placed in root.default queue.
+ */
+ public static class Default extends QueuePlacementRule {
+ @VisibleForTesting
+ String defaultQueueName;
+
+ @Override
+ public QueuePlacementRule initialize(boolean create,
+ Map<String, String> args) {
+ if (defaultQueueName == null) {
+ defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+ }
+ return super.initialize(create, args);
+ }
+
+ @Override
+ public void initializeFromXml(Element el)
+ throws AllocationConfigurationException {
+ defaultQueueName = el.getAttribute("queue");
+ if (defaultQueueName != null && !defaultQueueName.isEmpty()) {
+ if (!defaultQueueName.startsWith("root.")) {
+ defaultQueueName = "root." + defaultQueueName;
+ }
+ } else {
+ defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+ }
+ super.initializeFromXml(el);
+ }
+
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+ return defaultQueueName;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return true;
+ }
+ }
+
+ /**
+ * Rejects all apps
+ */
+ public static class Reject extends QueuePlacementRule {
+ @Override
+ public String assignAppToQueue(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+ return null;
+ }
+
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return true;
+ }
+ }
+
+ /**
+ * Replace the periods in the username or groupname with "_dot_" and
+ * remove trailing and leading whitespace.
+ */
+ protected String cleanName(String name) {
+ name = name.trim();
+ if (name.contains(".")) {
+ String converted = name.replaceAll("\\.", "_dot_");
+ LOG.warn("Name " + name + " is converted to " + converted
+ + " when it is used as a queue name.");
+ return converted;
+ } else {
+ return name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
new file mode 100644
index 0000000..0d19614
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.yarn.server.resourcemanager.scheduler.fair;
+//YARNUTIL: DUMMY IMPL
+
+public class SchedulingPolicy {
+ public static final SchedulingPolicy DEFAULT_POLICY = null;
+
+ public static SchedulingPolicy parse(String s) {
+ return DEFAULT_POLICY;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java
new file mode 100644
index 0000000..e62bc25
--- /dev/null
+++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java
@@ -0,0 +1,41 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.impala.yarn.server.utils;
+//YARNUTIL: MODIFIED
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+/**
+ * Builder utilities to construct various objects.
+ *
+ */
+public class BuilderUtils {
+
+ private static final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ public static Resource newResource(int memory, int vCores) {
+ Resource resource = recordFactory.newRecordInstance(Resource.class);
+ resource.setMemory(memory);
+ resource.setVirtualCores(vCores);
+ return resource;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/fe/CMakeLists.txt b/fe/CMakeLists.txt
index eee1601..d5c7c1e 100644
--- a/fe/CMakeLists.txt
+++ b/fe/CMakeLists.txt
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-add_custom_target(fe ALL DEPENDS thrift-deps fb-deps function-registry ext-data-source
+add_custom_target(fe ALL DEPENDS
+ thrift-deps fb-deps yarn-extras function-registry ext-data-source
COMMAND $ENV{IMPALA_HOME}/bin/mvn-quiet.sh install -DskipTests
)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index 0f7ae8c..a0330d7 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -47,6 +47,7 @@ under the License.
<impala.extdatasrc.api.version>1.0-SNAPSHOT</impala.extdatasrc.api.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kudu.version>${env.KUDU_JAVA_VERSION}</kudu.version>
+ <yarn-extras.version>${project.version}</yarn-extras.version>
<eclipse.output.directory>eclipse-classes</eclipse.output.directory>
</properties>
@@ -98,24 +99,18 @@ under the License.
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.sentry</groupId>
<artifactId>sentry-core-common</artifactId>
<version>${sentry.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.impala</groupId>
+ <artifactId>yarn-extras</artifactId>
+ <version>${yarn-extras.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.sentry</groupId>
<artifactId>sentry-core-model-db</artifactId>
<version>${sentry.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
index 9976623..5cea0e1 100644
--- a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
+++ b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
@@ -28,9 +28,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -49,6 +46,9 @@ import org.apache.impala.thrift.TResolveRequestPoolParams;
import org.apache.impala.thrift.TResolveRequestPoolResult;
import org.apache.impala.thrift.TStatus;
import org.apache.impala.util.FileWatchService.FileChangeListener;
+import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
+import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
index e849a81..e84a9bc 100644
--- a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
+++ b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
@@ -22,16 +22,25 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URISyntaxException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
+
+import org.apache.impala.authorization.User;
import org.apache.impala.common.ByteUnits;
import org.apache.impala.common.InternalException;
+import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TPoolConfig;
import org.apache.impala.thrift.TResolveRequestPoolParams;
@@ -109,6 +118,18 @@ public class TestRequestPoolService {
poolService_.start();
}
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ RuntimeEnv.INSTANCE.setTestEnv(true);
+ User.setRulesForTesting(
+ new Configuration().get(HADOOP_SECURITY_AUTH_TO_LOCAL, "DEFAULT"));
+ }
+
+ @AfterClass
+ public static void cleanUpClass() {
+ RuntimeEnv.INSTANCE.reset();
+ }
+
@After
public void cleanUp() throws Exception {
if (poolService_ != null) poolService_.stop();