You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/31 15:57:48 UTC

[24/32] hadoop git commit: YARN-6599. Support anti-affinity constraint via AppPlacementAllocator. (Wangda Tan via asuresh)

YARN-6599. Support anti-affinity constraint via AppPlacementAllocator. (Wangda Tan via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/38af2379
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/38af2379
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/38af2379

Branch: refs/heads/trunk
Commit: 38af23796971193fa529c3d08ffde8fcd6e607b6
Parents: 8779a35
Author: Arun Suresh <as...@apache.org>
Authored: Thu Jan 18 14:10:30 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 31 01:30:17 2018 -0800

----------------------------------------------------------------------
 .../v2/app/rm/TestRMContainerAllocator.java     |  15 +-
 .../sls/scheduler/SLSCapacityScheduler.java     |  15 +-
 .../yarn/sls/scheduler/SLSFairScheduler.java    |  12 +-
 .../dev-support/findbugs-exclude.xml            |   8 +
 .../yarn/api/resource/PlacementConstraints.java |  43 +-
 .../hadoop/yarn/conf/YarnConfiguration.java     |   2 +-
 ...SchedulerInvalidResoureRequestException.java |  47 ++
 .../api/impl/TestAMRMClientOnRMRestart.java     |   9 +-
 .../impl/pb/AllocateRequestPBImpl.java          |   1 +
 .../server/scheduler/SchedulerRequestKey.java   |  11 +
 .../resourcemanager/DefaultAMSProcessor.java    |  13 +-
 .../rmapp/attempt/RMAppAttemptImpl.java         |   5 +-
 .../scheduler/AbstractYarnScheduler.java        |   3 +-
 .../scheduler/AppSchedulingInfo.java            | 205 +++++--
 .../ApplicationPlacementAllocatorFactory.java   |  68 +++
 .../scheduler/ApplicationPlacementFactory.java  |  63 ---
 .../scheduler/ContainerUpdateContext.java       |   4 +-
 .../scheduler/SchedulerApplicationAttempt.java  |  20 +-
 .../scheduler/YarnScheduler.java                |  15 +-
 .../scheduler/capacity/CapacityScheduler.java   |  54 +-
 .../CapacitySchedulerConfiguration.java         |   5 +
 .../allocator/RegularContainerAllocator.java    |   3 +-
 .../scheduler/common/ContainerRequest.java      |  12 +
 .../scheduler/common/PendingAsk.java            |   6 +
 .../scheduler/common/fica/FiCaSchedulerApp.java |   6 +
 .../constraint/AllocationTagsManager.java       |  71 +--
 .../constraint/AllocationTagsNamespaces.java    |  31 --
 .../constraint/PlacementConstraintsUtil.java    | 165 ++++--
 .../algorithm/DefaultPlacementAlgorithm.java    |   2 +-
 .../processor/PlacementProcessor.java           |   8 +-
 .../scheduler/fair/FairScheduler.java           |  12 +-
 .../scheduler/fifo/FifoScheduler.java           |   7 +-
 .../placement/AppPlacementAllocator.java        |  66 ++-
 .../LocalityAppPlacementAllocator.java          |  35 +-
 .../SingleConstraintAppPlacementAllocator.java  | 531 +++++++++++++++++++
 .../server/resourcemanager/Application.java     |   9 +-
 .../yarn/server/resourcemanager/MockAM.java     |  51 ++
 .../attempt/TestRMAppAttemptTransitions.java    |  10 +-
 .../rmcontainer/TestRMContainerImpl.java        |   6 +-
 .../scheduler/TestAppSchedulingInfo.java        |   4 +-
 .../capacity/CapacitySchedulerTestBase.java     |  79 +++
 .../capacity/TestCapacityScheduler.java         |  90 +---
 .../TestCapacitySchedulerAsyncScheduling.java   |   2 +-
 .../TestCapacitySchedulerAutoQueueCreation.java |   2 +-
 ...apacitySchedulerSchedulingRequestUpdate.java | 260 +++++++++
 .../capacity/TestIncreaseAllocationExpirer.java |   2 +-
 ...estSchedulingRequestContainerAllocation.java | 277 ++++++++++
 ...hedulingRequestContainerAllocationAsync.java | 139 +++++
 .../scheduler/capacity/TestUtils.java           |   2 +
 .../constraint/TestAllocationTagsManager.java   |  30 +-
 .../TestPlacementConstraintsUtil.java           |  36 +-
 .../scheduler/fair/FairSchedulerTestBase.java   |   6 +-
 .../fair/TestContinuousScheduling.java          |  10 +-
 .../scheduler/fair/TestFairScheduler.java       |  30 +-
 .../scheduler/fifo/TestFifoScheduler.java       |  28 +-
 ...stSingleConstraintAppPlacementAllocator.java | 403 ++++++++++++++
 56 files changed, 2557 insertions(+), 492 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 85e4181..7875917 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -1751,6 +1752,7 @@ public class TestRMContainerAllocator {
       super();
       try {
         Configuration conf = new Configuration();
+        init(conf);
         reinitialize(conf, rmContext);
       } catch (IOException ie) {
         LOG.info("add application failed with ", ie);
@@ -1769,8 +1771,8 @@ public class TestRMContainerAllocator {
     @Override
     public synchronized Allocation allocate(
         ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
-        List<ContainerId> release, List<String> blacklistAdditions,
-        List<String> blacklistRemovals,
+        List<SchedulingRequest> schedulingRequests, List<ContainerId> release,
+        List<String> blacklistAdditions, List<String> blacklistRemovals,
         ContainerUpdates updateRequests) {
       List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
       for (ResourceRequest req : ask) {
@@ -1785,7 +1787,7 @@ public class TestRMContainerAllocator {
       lastBlacklistAdditions = blacklistAdditions;
       lastBlacklistRemovals = blacklistRemovals;
       Allocation allocation = super.allocate(
-          applicationAttemptId, askCopy, release, blacklistAdditions,
+          applicationAttemptId, askCopy, schedulingRequests, release, blacklistAdditions,
           blacklistRemovals, updateRequests);
       if (forceResourceLimit != null) {
         // Test wants to force the non-default resource limit
@@ -1805,6 +1807,7 @@ public class TestRMContainerAllocator {
       super();
       try {
         Configuration conf = new Configuration();
+        init(conf);
         reinitialize(conf, rmContext);
       } catch (IOException ie) {
         LOG.info("add application failed with ", ie);
@@ -1815,8 +1818,8 @@ public class TestRMContainerAllocator {
     @Override
     public synchronized Allocation allocate(
         ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
-        List<ContainerId> release, List<String> blacklistAdditions,
-        List<String> blacklistRemovals,
+        List<SchedulingRequest> schedulingRequests, List<ContainerId> release,
+        List<String> blacklistAdditions, List<String> blacklistRemovals,
         ContainerUpdates updateRequests) {
       List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
       for (ResourceRequest req : ask) {
@@ -1827,7 +1830,7 @@ public class TestRMContainerAllocator {
       }
       SecurityUtil.setTokenServiceUseIp(false);
       Allocation normalAlloc = super.allocate(
-          applicationAttemptId, askCopy, release,
+          applicationAttemptId, askCopy, schedulingRequests, release,
           blacklistAdditions, blacklistRemovals, updateRequests);
       List<Container> containers = normalAlloc.getContainers();
       if(containers.size() > 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index 6848b22..35f3ed1 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -42,9 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -100,16 +99,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
 
   @Override
   public Allocation allocate(ApplicationAttemptId attemptId,
-      List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
-      List<String> strings, List<String> strings2,
-      ContainerUpdates updateRequests) {
+      List<ResourceRequest> resourceRequests,
+      List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds,
+      List<String> strings, List<String> strings2, ContainerUpdates updateRequests) {
     if (metricsON) {
       final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
           .time();
       Allocation allocation = null;
       try {
         allocation = super
-            .allocate(attemptId, resourceRequests, containerIds, strings,
+            .allocate(attemptId, resourceRequests, schedulingRequests,
+                containerIds, strings,
                 strings2, updateRequests);
         return allocation;
       } finally {
@@ -123,7 +123,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
         }
       }
     } else {
-      return super.allocate(attemptId, resourceRequests, containerIds, strings,
+      return super.allocate(attemptId, resourceRequests, schedulingRequests,
+          containerIds, strings,
           strings2, updateRequests);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
index 8e49c51..c27ab3e 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -39,8 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
@@ -94,7 +93,8 @@ public class SLSFairScheduler extends FairScheduler
 
   @Override
   public Allocation allocate(ApplicationAttemptId attemptId,
-      List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
+      List<ResourceRequest> resourceRequests,
+      List<SchedulingRequest> schedulingRequests, List<ContainerId> containerIds,
       List<String> blacklistAdditions, List<String> blacklistRemovals,
       ContainerUpdates updateRequests) {
     if (metricsON) {
@@ -102,7 +102,8 @@ public class SLSFairScheduler extends FairScheduler
           .time();
       Allocation allocation = null;
       try {
-        allocation = super.allocate(attemptId, resourceRequests, containerIds,
+        allocation = super.allocate(attemptId, resourceRequests,
+            schedulingRequests, containerIds,
             blacklistAdditions, blacklistRemovals, updateRequests);
         return allocation;
       } finally {
@@ -116,7 +117,8 @@ public class SLSFairScheduler extends FairScheduler
         }
       }
     } else {
-      return super.allocate(attemptId, resourceRequests, containerIds,
+      return super.allocate(attemptId, resourceRequests, schedulingRequests,
+          containerIds,
           blacklistAdditions, blacklistRemovals, updateRequests);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 6a10312..81b8825 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -650,4 +650,12 @@
     <Method name="equals" />
     <Bug pattern="EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC" />
   </Match>
+
+  <!-- Null pointer exception needs to be ignored here as Findbugs doesn't properly detect code logic -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SingleConstraintAppPlacementAllocator" />
+    <Method name="validateAndSetSchedulingRequest" />
+    <Bug pattern="NP_NULL_ON_SOME_PATH" />
+  </Match>
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
index c8991cb..ba1beae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
@@ -20,8 +20,12 @@ package org.apache.hadoop.yarn.api.resource;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
@@ -47,6 +51,14 @@ public final class PlacementConstraints {
 
   public static final String NODE = PlacementConstraint.NODE_SCOPE;
   public static final String RACK = PlacementConstraint.RACK_SCOPE;
+  public static final String NODE_PARTITION = "yarn_node_partition/";
+
+  private static final String APPLICATION_LABEL_PREFIX =
+      "yarn_application_label/";
+
+  @InterfaceAudience.Private
+  public static final String APPLICATION_LABEL_INTRA_APPLICATION =
+      APPLICATION_LABEL_PREFIX + "%intra_app%";
 
   /**
    * Creates a constraint that requires allocations to be placed on nodes that
@@ -187,6 +199,20 @@ public final class PlacementConstraints {
     }
 
     /**
+     * Constructs a target expression on a node partition. It is satisfied if
+     * the specified node partition has one of the specified nodePartitions
+     *
+     * @param nodePartitions the set of values that the attribute should take
+     *          values from
+     * @return the resulting expression on the node attribute
+     */
+    public static TargetExpression nodePartition(
+        String... nodePartitions) {
+      return new TargetExpression(TargetType.NODE_ATTRIBUTE, NODE_PARTITION,
+          nodePartitions);
+    }
+
+    /**
      * Constructs a target expression on an allocation tag. It is satisfied if
      * the there are allocations with one of the given tags.
      *
@@ -198,6 +224,22 @@ public final class PlacementConstraints {
       return new TargetExpression(TargetType.ALLOCATION_TAG, null,
           allocationTags);
     }
+
+    /**
+     * Constructs a target expression on an allocation tag. It is satisfied if
+     * the there are allocations with one of the given tags. Comparing to
+     * {@link PlacementTargets#allocationTag(String...)}, this only check tags
+     * within the application.
+     *
+     * @param allocationTags the set of tags that the attribute should take
+     *          values from
+     * @return the resulting expression on the allocation tags
+     */
+    public static TargetExpression allocationTagToIntraApp(
+        String... allocationTags) {
+      return new TargetExpression(TargetType.ALLOCATION_TAG,
+          APPLICATION_LABEL_INTRA_APPLICATION, allocationTags);
+    }
   }
 
   // Creation of compound constraints.
@@ -277,5 +319,4 @@ public final class PlacementConstraints {
   public static PlacementConstraint build(AbstractConstraint constraintExpr) {
     return constraintExpr.build();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 367b1ae..f5bb2c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -543,7 +543,7 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED =
       RM_PREFIX + "placement-constraints.enabled";
 
-  public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = true;
+  public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = false;
 
   public static final String RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS =
       RM_PREFIX + "placement-constraints.retry-attempts";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java
new file mode 100644
index 0000000..f55ad83
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This exception is thrown when any issue inside scheduler to handle a new or
+ * updated {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}/
+ * {@link org.apache.hadoop.yarn.api.records.ResourceRequest} add to the
+ * scheduler.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SchedulerInvalidResoureRequestException extends YarnRuntimeException {
+  private static final long serialVersionUID = 10081123982L;
+
+  public SchedulerInvalidResoureRequestException(String message) {
+    super(message);
+  }
+
+  public SchedulerInvalidResoureRequestException(Throwable cause) {
+    super(cause);
+  }
+
+  public SchedulerInvalidResoureRequestException(String message,
+      Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
index 337d7d4..11d703d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -545,6 +546,7 @@ public class TestAMRMClientOnRMRestart {
       super();
       try {
         Configuration conf = new Configuration();
+        init(conf);
         reinitialize(conf, rmContext);
       } catch (IOException ie) {
         assert (false);
@@ -563,8 +565,8 @@ public class TestAMRMClientOnRMRestart {
     @Override
     public synchronized Allocation allocate(
         ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
-        List<ContainerId> release, List<String> blacklistAdditions,
-        List<String> blacklistRemovals,
+        List<SchedulingRequest> schedulingRequests, List<ContainerId> release,
+        List<String> blacklistAdditions, List<String> blacklistRemovals,
         ContainerUpdates updateRequests) {
       List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
       for (ResourceRequest req : ask) {
@@ -580,7 +582,8 @@ public class TestAMRMClientOnRMRestart {
       lastDecrease = updateRequests.getDecreaseRequests();
       lastBlacklistAdditions = blacklistAdditions;
       lastBlacklistRemovals = blacklistRemovals;
-      return super.allocate(applicationAttemptId, askCopy, release,
+      return super.allocate(applicationAttemptId, askCopy, schedulingRequests,
+          release,
           blacklistAdditions, blacklistRemovals, updateRequests);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
index b460044..50672a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
@@ -194,6 +194,7 @@ public class AllocateRequestPBImpl extends AllocateRequest {
   public void setSchedulingRequests(
       List<SchedulingRequest> schedulingRequests) {
     if (schedulingRequests == null) {
+      builder.clearSchedulingRequests();
       return;
     }
     initSchedulingRequests();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
index c4f37f6..0fce083 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 
 /**
@@ -45,6 +46,16 @@ public final class SchedulerRequestKey implements
         req.getAllocationRequestId(), null);
   }
 
+  /**
+   * Factory method to generate a SchedulerRequestKey from a SchedulingRequest.
+   * @param req SchedulingRequest
+   * @return SchedulerRequestKey
+   */
+  public static SchedulerRequestKey create(SchedulingRequest req) {
+    return new SchedulerRequestKey(req.getPriority(),
+        req.getAllocationRequestId(), null);
+  }
+
   public static SchedulerRequestKey create(UpdateContainerRequest req,
       SchedulerRequestKey schedulerRequestKey) {
     return new SchedulerRequestKey(schedulerRequestKey.getPriority(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index 713947f..18ab473 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -273,10 +274,14 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
           " state, ignore container allocate request.");
       allocation = EMPTY_ALLOCATION;
     } else {
-      allocation =
-          getScheduler().allocate(appAttemptId, ask, release,
-              blacklistAdditions, blacklistRemovals,
-              containerUpdateRequests);
+      try {
+        allocation = getScheduler().allocate(appAttemptId, ask,
+            request.getSchedulingRequests(), release,
+            blacklistAdditions, blacklistRemovals, containerUpdateRequests);
+      } catch (SchedulerInvalidResoureRequestException e) {
+        LOG.warn("Exceptions caught when scheduler handling requests");
+        throw new YarnException(e);
+      }
     }
 
     if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index cf10be4..8c2f4e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -1113,8 +1113,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         Allocation amContainerAllocation =
             appAttempt.scheduler.allocate(
                 appAttempt.applicationAttemptId,
-                appAttempt.amReqs,
-                EMPTY_CONTAINER_RELEASE_LIST,
+                appAttempt.amReqs, null, EMPTY_CONTAINER_RELEASE_LIST,
                 amBlacklist.getBlacklistAdditions(),
                 amBlacklist.getBlacklistRemovals(),
                 new ContainerUpdates());
@@ -1140,7 +1139,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       // Acquire the AM container from the scheduler.
       Allocation amContainerAllocation =
           appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
-            EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
+            EMPTY_CONTAINER_REQUEST_LIST, null, EMPTY_CONTAINER_RELEASE_LIST, null,
             null, new ContainerUpdates());
       // There must be at least one container allocated, because a
       // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 72376df..7f81f00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
@@ -1155,7 +1156,7 @@ public abstract class AbstractYarnScheduler
    *
    * @param asks resource requests
    */
-  protected void normalizeRequests(List<ResourceRequest> asks) {
+  protected void normalizeResourceRequests(List<ResourceRequest> asks) {
     for (ResourceRequest ask: asks) {
       ask.setCapability(getNormalizedResource(ask.getCapability()));
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 8858d3b..7d6f233 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -49,7 +51,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.Applicatio
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SingleConstraintAppPlacementAllocator;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 /**
@@ -91,11 +95,12 @@ public class AppSchedulingInfo {
 
   public final ContainerUpdateContext updateContext;
   public final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
+  private final RMContext rmContext;
 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
       Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
       ResourceUsage appResourceUsage,
-      Map<String, String> applicationSchedulingEnvs) {
+      Map<String, String> applicationSchedulingEnvs, RMContext rmContext) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
@@ -105,6 +110,7 @@ public class AppSchedulingInfo {
         epoch << ResourceManager.EPOCH_BIT_SHIFT);
     this.appResourceUsage = appResourceUsage;
     this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs);
+    this.rmContext = rmContext;
 
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     updateContext = new ContainerUpdateContext(this);
@@ -163,74 +169,153 @@ public class AppSchedulingInfo {
    * application, by asking for more resources and releasing resources acquired
    * by the application.
    *
-   * @param requests
-   *          resources to be acquired
+   * @param resourceRequests resource requests to be allocated
    * @param recoverPreemptedRequestForAContainer
-   *          recover ResourceRequest on preemption
+   *          recover ResourceRequest/SchedulingRequest on preemption
    * @return true if any resource was updated, false otherwise
    */
-  public boolean updateResourceRequests(List<ResourceRequest> requests,
+  public boolean updateResourceRequests(List<ResourceRequest> resourceRequests,
       boolean recoverPreemptedRequestForAContainer) {
-    if (null == requests || requests.isEmpty()) {
-      return false;
+    // Flag to track if any incoming requests update "ANY" requests
+    boolean offswitchResourcesUpdated;
+
+    writeLock.lock();
+    try {
+      // Update AppPlacementAllocator by requests
+      offswitchResourcesUpdated = internalAddResourceRequests(
+          recoverPreemptedRequestForAContainer, resourceRequests);
+    } finally {
+      writeLock.unlock();
     }
 
+    return offswitchResourcesUpdated;
+  }
+
+  /**
+   * The ApplicationMaster is updating resource requirements for the
+   * application, by asking for more resources and releasing resources acquired
+   * by the application.
+   *
+   * @param dedupRequests (dedup) resource requests to be allocated
+   * @param recoverPreemptedRequestForAContainer
+   *          recover ResourceRequest/SchedulingRequest on preemption
+   * @return true if any resource was updated, false otherwise
+   */
+  public boolean updateResourceRequests(
+      Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests,
+      boolean recoverPreemptedRequestForAContainer) {
     // Flag to track if any incoming requests update "ANY" requests
-    boolean offswitchResourcesUpdated = false;
+    boolean offswitchResourcesUpdated;
 
+    writeLock.lock();
     try {
-      this.writeLock.lock();
-
-      // A map to group resource requests and dedup
-      Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests =
-          new HashMap<>();
+      // Update AppPlacementAllocator by requests
+      offswitchResourcesUpdated = internalAddResourceRequests(
+          recoverPreemptedRequestForAContainer, dedupRequests);
+    } finally {
+      writeLock.unlock();
+    }
 
-      // Group resource request by schedulerRequestKey and resourceName
-      for (ResourceRequest request : requests) {
-        SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
-        if (!dedupRequests.containsKey(schedulerKey)) {
-          dedupRequests.put(schedulerKey, new HashMap<>());
-        }
-        dedupRequests.get(schedulerKey).put(request.getResourceName(), request);
-      }
+    return offswitchResourcesUpdated;
+  }
 
-      // Update AppPlacementAllocator by dedup requests.
-      offswitchResourcesUpdated =
-          addRequestToAppPlacement(
-              recoverPreemptedRequestForAContainer, dedupRequests);
+  /**
+   * The ApplicationMaster is updating resource requirements for the
+   * application, by asking for more resources and releasing resources acquired
+   * by the application.
+   *
+   * @param schedulingRequests resource requests to be allocated
+   * @param recoverPreemptedRequestForAContainer
+   *          recover ResourceRequest/SchedulingRequest on preemption
+   * @return true if any resource was updated, false otherwise
+   */
+  public boolean updateSchedulingRequests(
+      List<SchedulingRequest> schedulingRequests,
+      boolean recoverPreemptedRequestForAContainer) {
+    // Flag to track if any incoming requests update "ANY" requests
+    boolean offswitchResourcesUpdated;
 
-      return offswitchResourcesUpdated;
+    writeLock.lock();
+    try {
+      // Update AppPlacementAllocator by requests
+      offswitchResourcesUpdated = addSchedulingRequests(
+          recoverPreemptedRequestForAContainer, schedulingRequests);
     } finally {
-      this.writeLock.unlock();
+      writeLock.unlock();
     }
+
+    return offswitchResourcesUpdated;
   }
 
   public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) {
     schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey);
   }
 
-  boolean addRequestToAppPlacement(
+  private boolean addSchedulingRequests(
+      boolean recoverPreemptedRequestForAContainer,
+      List<SchedulingRequest> schedulingRequests) {
+    // Do we need to update pending resource for app/queue, etc.?
+    boolean requireUpdatePendingResource = false;
+
+    for (SchedulingRequest request : schedulingRequests) {
+      SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create(
+          request);
+
+      AppPlacementAllocator appPlacementAllocator =
+          getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey,
+              SingleConstraintAppPlacementAllocator.class.getCanonicalName());
+
+      // Update AppPlacementAllocator
+      PendingAskUpdateResult pendingAmountChanges =
+          appPlacementAllocator.updatePendingAsk(schedulerRequestKey,
+              request, recoverPreemptedRequestForAContainer);
+
+      if (null != pendingAmountChanges) {
+        updatePendingResources(pendingAmountChanges, schedulerRequestKey,
+            queue.getMetrics());
+        requireUpdatePendingResource = true;
+      }
+    }
+
+    return requireUpdatePendingResource;
+  }
+
+  /**
+   * Get and insert AppPlacementAllocator if it doesn't exist, this should be
+   * protected by write lock.
+   * @param schedulerRequestKey schedulerRequestKey
+   * @param placementTypeClass placementTypeClass
+   * @return AppPlacementAllocator
+   */
+  private AppPlacementAllocator<SchedulerNode> getAndAddAppPlacementAllocatorIfNotExist(
+      SchedulerRequestKey schedulerRequestKey, String placementTypeClass) {
+    AppPlacementAllocator<SchedulerNode> appPlacementAllocator;
+    if ((appPlacementAllocator = schedulerKeyToAppPlacementAllocator.get(
+        schedulerRequestKey)) == null) {
+      appPlacementAllocator =
+          ApplicationPlacementAllocatorFactory.getAppPlacementAllocator(
+              placementTypeClass, this, schedulerRequestKey, rmContext);
+      schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey,
+          appPlacementAllocator);
+    }
+    return appPlacementAllocator;
+  }
+
+  private boolean internalAddResourceRequests(
       boolean recoverPreemptedRequestForAContainer,
       Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
     boolean offswitchResourcesUpdated = false;
     for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry :
     dedupRequests.entrySet()) {
       SchedulerRequestKey schedulerRequestKey = entry.getKey();
-
-      if (!schedulerKeyToAppPlacementAllocator
-          .containsKey(schedulerRequestKey)) {
-        AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = ApplicationPlacementFactory
-            .getAppPlacementAllocator(applicationSchedulingEnvs
-                .get(ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS));
-        placementAllocatorInstance.setAppSchedulingInfo(this);
-
-        schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey,
-            placementAllocatorInstance);
-      }
+      AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
+          getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey,
+              applicationSchedulingEnvs.get(
+                  ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS));
 
       // Update AppPlacementAllocator
-      PendingAskUpdateResult pendingAmountChanges = schedulerKeyToAppPlacementAllocator
-          .get(schedulerRequestKey).updatePendingAsk(entry.getValue().values(),
+      PendingAskUpdateResult pendingAmountChanges =
+          appPlacementAllocator.updatePendingAsk(entry.getValue().values(),
               recoverPreemptedRequestForAContainer);
 
       if (null != pendingAmountChanges) {
@@ -242,6 +327,29 @@ public class AppSchedulingInfo {
     return offswitchResourcesUpdated;
   }
 
+  private boolean internalAddResourceRequests(boolean recoverPreemptedRequestForAContainer,
+      List<ResourceRequest> resourceRequests) {
+    if (null == resourceRequests || resourceRequests.isEmpty()) {
+      return false;
+    }
+
+    // A map to group resource requests and dedup
+    Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests =
+        new HashMap<>();
+
+    // Group resource request by schedulerRequestKey and resourceName
+    for (ResourceRequest request : resourceRequests) {
+      SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
+      if (!dedupRequests.containsKey(schedulerKey)) {
+        dedupRequests.put(schedulerKey, new HashMap<>());
+      }
+      dedupRequests.get(schedulerKey).put(request.getResourceName(), request);
+    }
+
+    return internalAddResourceRequests(recoverPreemptedRequestForAContainer,
+        dedupRequests);
+  }
+
   private void updatePendingResources(PendingAskUpdateResult updateResult,
       SchedulerRequestKey schedulerKey, QueueMetrics metrics) {
 
@@ -629,13 +737,22 @@ public class AppSchedulingInfo {
     }
   }
 
-  public boolean acceptNodePartition(SchedulerRequestKey schedulerKey,
-      String nodePartition, SchedulingMode schedulingMode) {
+  /**
+   * Pre-check node to see if it satisfy the given schedulerKey and
+   * scheduler mode
+   *
+   * @param schedulerKey schedulerKey
+   * @param schedulerNode schedulerNode
+   * @param schedulingMode schedulingMode
+   * @return can use the node or not.
+   */
+  public boolean precheckNode(SchedulerRequestKey schedulerKey,
+      SchedulerNode schedulerNode, SchedulingMode schedulingMode) {
     try {
       this.readLock.lock();
       AppPlacementAllocator ap =
           schedulerKeyToAppPlacementAllocator.get(schedulerKey);
-      return (ap != null) && ap.acceptNodePartition(nodePartition,
+      return (ap != null) && ap.precheckNode(schedulerNode,
           schedulingMode);
     } finally {
       this.readLock.unlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java
new file mode 100644
index 0000000..a4e5484
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementAllocatorFactory.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+
+/**
+ * Factory class to build various application placement policies.
+ */
+@Public
+@Unstable
+public class ApplicationPlacementAllocatorFactory {
+
+  /**
+   * Get AppPlacementAllocator related to the placement type requested.
+   *
+   * @param appPlacementAllocatorName
+   *          allocator class name.
+   * @return Specific AppPlacementAllocator instance based on type
+   */
+  public static AppPlacementAllocator<SchedulerNode> getAppPlacementAllocator(
+      String appPlacementAllocatorName, AppSchedulingInfo appSchedulingInfo,
+      SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
+    Class<?> policyClass;
+    try {
+      if (appPlacementAllocatorName == null) {
+        policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
+      } else {
+        policyClass = Class.forName(appPlacementAllocatorName);
+      }
+    } catch (ClassNotFoundException e) {
+      policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
+    }
+
+    if (!AppPlacementAllocator.class.isAssignableFrom(policyClass)) {
+      policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
+    }
+
+    @SuppressWarnings("unchecked")
+    AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = (AppPlacementAllocator<SchedulerNode>) ReflectionUtils
+        .newInstance(policyClass, null);
+    placementAllocatorInstance.initialize(appSchedulingInfo,
+        schedulerRequestKey, rmContext);
+    return placementAllocatorInstance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java
deleted file mode 100644
index 40c8d05..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
-
-/**
- * Factory class to build various application placement policies.
- */
-@Public
-@Unstable
-public class ApplicationPlacementFactory {
-
-  /**
-   * Get AppPlacementAllocator related to the placement type requested.
-   *
-   * @param appPlacementAllocatorName
-   *          allocator class name.
-   * @return Specific AppPlacementAllocator instance based on type
-   */
-  public static AppPlacementAllocator<SchedulerNode> getAppPlacementAllocator(
-      String appPlacementAllocatorName) {
-    Class<?> policyClass;
-    try {
-      if (appPlacementAllocatorName == null) {
-        policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
-      } else {
-        policyClass = Class.forName(appPlacementAllocatorName);
-      }
-    } catch (ClassNotFoundException e) {
-      policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
-    }
-
-    if (!AppPlacementAllocator.class.isAssignableFrom(policyClass)) {
-      policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
-    }
-
-    @SuppressWarnings("unchecked")
-    AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = (AppPlacementAllocator<SchedulerNode>) ReflectionUtils
-        .newInstance(policyClass, null);
-    return placementAllocatorInstance;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
index f410db1..491a9ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -146,7 +146,7 @@ public class ContainerUpdateContext {
           createResourceRequests(rmContainer, schedulerNode,
               schedulerKey, resToIncrease);
       updateResReqs.put(schedulerKey, resMap);
-      appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs);
+      appSchedulingInfo.updateResourceRequests(updateResReqs, false);
     }
     return true;
   }
@@ -290,7 +290,7 @@ public class ContainerUpdateContext {
           (rmContainer, node, schedulerKey,
           rmContainer.getContainer().getResource());
       reqsToUpdate.put(schedulerKey, resMap);
-      appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate);
+      appSchedulingInfo.updateResourceRequests(reqsToUpdate, true);
       return UNDEFINED;
     }
     return retVal;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 3930a35..753c2b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -231,7 +232,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
 
     this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user,
         queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage,
-        applicationSchedulingEnvs);
+        applicationSchedulingEnvs, rmContext);
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
     writeLock = lock.writeLock();
@@ -451,6 +452,23 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       writeLock.unlock();
     }
   }
+
+  public boolean updateSchedulingRequests(
+      List<SchedulingRequest> requests) {
+    if (requests == null) {
+      return false;
+    }
+
+    try {
+      writeLock.lock();
+      if (!isStopped) {
+        return appSchedulingInfo.updateSchedulingRequests(requests, false);
+      }
+      return false;
+    } finally {
+      writeLock.unlock();
+    }
+  }
   
   public void recoverResourceRequestsForContainer(
       ContainerRequest containerRequest) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index 93ca7c2..43d55c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -132,18 +133,18 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
    * 
    * @param appAttemptId
    * @param ask
+   * @param schedulingRequests
    * @param release
-   * @param blacklistAdditions 
-   * @param blacklistRemovals 
-   * @param updateRequests
-   * @return the {@link Allocation} for the application
+   * @param blacklistAdditions
+   * @param blacklistRemovals
+   * @param updateRequests     @return the {@link Allocation} for the application
    */
   @Public
   @Stable
   Allocation allocate(ApplicationAttemptId appAttemptId,
-      List<ResourceRequest> ask, List<ContainerId> release,
-      List<String> blacklistAdditions, List<String> blacklistRemovals,
-      ContainerUpdates updateRequests);
+      List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
+      List<ContainerId> release, List<String> blacklistAdditions,
+      List<String> blacklistRemovals, ContainerUpdates updateRequests);
 
   /**
    * Get node resource usage report.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index d2713c8..c713036 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -60,8 +60,11 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -1058,12 +1061,29 @@ public class CapacityScheduler extends
     }
   }
 
+  /**
+   * Normalize a list of SchedulingRequest
+   *
+   * @param asks scheduling request
+   */
+  private void normalizeSchedulingRequests(List<SchedulingRequest> asks) {
+    if (asks == null) {
+      return;
+    }
+    for (SchedulingRequest ask: asks) {
+      ResourceSizing sizing = ask.getResourceSizing();
+      if (sizing != null && sizing.getResources() != null) {
+        sizing.setResources(getNormalizedResource(sizing.getResources()));
+      }
+    }
+  }
+
   @Override
   @Lock(Lock.NoLock.class)
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
-      List<ResourceRequest> ask, List<ContainerId> release,
-      List<String> blacklistAdditions, List<String> blacklistRemovals,
-      ContainerUpdates updateRequests) {
+      List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
+      List<ContainerId> release, List<String> blacklistAdditions,
+      List<String> blacklistRemovals, ContainerUpdates updateRequests) {
     FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
     if (application == null) {
       LOG.error("Calling allocate on removed or non existent application " +
@@ -1071,6 +1091,18 @@ public class CapacityScheduler extends
       return EMPTY_ALLOCATION;
     }
 
+    if ((!getConfiguration().getBoolean(
+        CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
+        CapacitySchedulerConfiguration.DEFAULT_SCHEDULING_REQUEST_ALLOWED))
+        && schedulingRequests != null && (!schedulingRequests.isEmpty())) {
+      throw new SchedulerInvalidResoureRequestException(
+          "Application attempt:" + applicationAttemptId
+              + " is using SchedulingRequest, which is disabled. Please update "
+              + CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED
+              + " to true in capacity-scheduler.xml in order to use this "
+              + "feature.");
+    }
+
     // The allocate may be the leftover from previous attempt, and it will
     // impact current attempt, such as confuse the request and allocation for
     // current attempt's AM container.
@@ -1091,7 +1123,10 @@ public class CapacityScheduler extends
     LeafQueue updateDemandForQueue = null;
 
     // Sanity check for new allocation requests
-    normalizeRequests(ask);
+    normalizeResourceRequests(ask);
+
+    // Normalize scheduling requests
+    normalizeSchedulingRequests(schedulingRequests);
 
     Allocation allocation;
 
@@ -1104,7 +1139,8 @@ public class CapacityScheduler extends
       }
 
       // Process resource requests
-      if (!ask.isEmpty()) {
+      if (!ask.isEmpty() || (schedulingRequests != null && !schedulingRequests
+          .isEmpty())) {
         if (LOG.isDebugEnabled()) {
           LOG.debug(
               "allocate: pre-update " + applicationAttemptId + " ask size ="
@@ -1113,7 +1149,8 @@ public class CapacityScheduler extends
         }
 
         // Update application requests
-        if (application.updateResourceRequests(ask)) {
+        if (application.updateResourceRequests(ask) || application
+            .updateSchedulingRequests(schedulingRequests)) {
           updateDemandForQueue = (LeafQueue) application.getQueue();
         }
 
@@ -2580,10 +2617,9 @@ public class CapacityScheduler extends
         // Validate placement constraint is satisfied before
         // committing the request.
         try {
-          if (!PlacementConstraintsUtil.canSatisfyConstraints(
+          if (!PlacementConstraintsUtil.canSatisfySingleConstraint(
               appAttempt.getApplicationId(),
-              schedulingRequest.getAllocationTags(),
-              schedulerNode,
+              schedulingRequest.getAllocationTags(), schedulerNode,
               rmContext.getPlacementConstraintManager(),
               rmContext.getAllocationTagsManager())) {
             LOG.debug("Failed to allocate container for application "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index e609be9..00733a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -77,6 +77,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   
   @Private
   public static final String PREFIX = "yarn.scheduler.capacity.";
+
+  @Private
+  public static final String SCHEDULING_REQUEST_ALLOWED =
+      PREFIX + "scheduling-request.allowed";
+  public static final boolean DEFAULT_SCHEDULING_REQUEST_ALLOWED = false;
   
   @Private
   public static final String DOT = ".";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 2642532..afa468b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -143,8 +143,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     // Is the nodePartition of pending request matches the node's partition
     // If not match, jump to next priority.
-    if (!appInfo.acceptNodePartition(schedulerKey, node.getPartition(),
-        schedulingMode)) {
+    if (!appInfo.precheckNode(schedulerKey, node, schedulingMode)) {
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
           activitiesManager, node, application, priority,
           ActivityDiagnosticConstant.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java
index 075db79..cad15a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
 
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 
 import java.util.List;
 
@@ -43,12 +44,23 @@ import java.util.List;
  */
 public class ContainerRequest {
   private List<ResourceRequest> requests;
+  private SchedulingRequest schedulingRequest;
 
   public ContainerRequest(List<ResourceRequest> requests) {
     this.requests = requests;
+    schedulingRequest = null;
+  }
+
+  public ContainerRequest(SchedulingRequest schedulingRequest) {
+    this.schedulingRequest = schedulingRequest;
+    this.requests = null;
   }
 
   public List<ResourceRequest> getResourceRequests() {
     return requests;
   }
+
+  public SchedulingRequest getSchedulingRequest() {
+    return schedulingRequest;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
index 85d8715..2ed3e83 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -31,6 +32,11 @@ public class PendingAsk {
   private final int count;
   public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0);
 
+  public PendingAsk(ResourceSizing sizing) {
+    this.perAllocationResource = sizing.getResources();
+    this.count = sizing.getNumAllocations();
+  }
+
   public PendingAsk(Resource res, int num) {
     this.perAllocationResource = res;
     this.count = num;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 4ea0347..7eb1e31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -542,6 +542,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
                 schedulerContainer.getRmContainer().getContainer());
             ((RMContainerImpl) rmContainer).setContainerRequest(
                 containerRequest);
+
+            // If this is from a SchedulingRequest, set allocation tags.
+            if (containerRequest.getSchedulingRequest() != null) {
+              ((RMContainerImpl) rmContainer).setAllocationTags(
+                  containerRequest.getSchedulingRequest().getAllocationTags());
+            }
           }
 
           attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
index 4bb3e79..962e548 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.log4j.Logger;
 
@@ -287,21 +288,15 @@ public class AllocationTagsManager {
    *                       {@link SchedulingRequest#getAllocationTags()}
    *                       application_id will be added to allocationTags.
    */
+  @SuppressWarnings("unchecked")
   public void addContainer(NodeId nodeId, ContainerId containerId,
       Set<String> allocationTags) {
+    // Do nothing for empty allocation tags.
+    if (allocationTags == null || allocationTags.isEmpty()) {
+      return;
+    }
     ApplicationId applicationId =
         containerId.getApplicationAttemptId().getApplicationId();
-    String applicationIdTag =
-        AllocationTagsNamespaces.APP_ID + applicationId.toString();
-
-    boolean useSet = false;
-    if (allocationTags != null && !allocationTags.isEmpty()) {
-      // Copy before edit it.
-      allocationTags = new HashSet<>(allocationTags);
-      allocationTags.add(applicationIdTag);
-      useSet = true;
-    }
-
     writeLock.lock();
     try {
       TypeToCountedTags perAppTagsMapping = perAppNodeMappings
@@ -311,19 +306,12 @@ public class AllocationTagsManager {
       // Covering test-cases where context is mocked
       String nodeRack = (rmContext.getRMNodes() != null
           && rmContext.getRMNodes().get(nodeId) != null)
-              ? rmContext.getRMNodes().get(nodeId).getRackName()
-              : "default-rack";
-      if (useSet) {
-        perAppTagsMapping.addTags(nodeId, allocationTags);
-        perAppRackTagsMapping.addTags(nodeRack, allocationTags);
-        globalNodeMapping.addTags(nodeId, allocationTags);
-        globalRackMapping.addTags(nodeRack, allocationTags);
-      } else {
-        perAppTagsMapping.addTag(nodeId, applicationIdTag);
-        perAppRackTagsMapping.addTag(nodeRack, applicationIdTag);
-        globalNodeMapping.addTag(nodeId, applicationIdTag);
-        globalRackMapping.addTag(nodeRack, applicationIdTag);
-      }
+              ? rmContext.getRMNodes().get(nodeId).getRackName() :
+          "default-rack";
+      perAppTagsMapping.addTags(nodeId, allocationTags);
+      perAppRackTagsMapping.addTags(nodeRack, allocationTags);
+      globalNodeMapping.addTags(nodeId, allocationTags);
+      globalRackMapping.addTags(nodeRack, allocationTags);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("Added container=" + containerId + " with tags=["
@@ -341,20 +329,15 @@ public class AllocationTagsManager {
    * @param containerId    containerId.
    * @param allocationTags allocation tags for given container
    */
+  @SuppressWarnings("unchecked")
   public void removeContainer(NodeId nodeId,
       ContainerId containerId, Set<String> allocationTags) {
+    // Do nothing for empty allocation tags.
+    if (allocationTags == null || allocationTags.isEmpty()) {
+      return;
+    }
     ApplicationId applicationId =
         containerId.getApplicationAttemptId().getApplicationId();
-    String applicationIdTag =
-        AllocationTagsNamespaces.APP_ID + applicationId.toString();
-    boolean useSet = false;
-
-    if (allocationTags != null && !allocationTags.isEmpty()) {
-      // Copy before edit it.
-      allocationTags = new HashSet<>(allocationTags);
-      allocationTags.add(applicationIdTag);
-      useSet = true;
-    }
 
     writeLock.lock();
     try {
@@ -368,19 +351,12 @@ public class AllocationTagsManager {
       // Covering test-cases where context is mocked
       String nodeRack = (rmContext.getRMNodes() != null
           && rmContext.getRMNodes().get(nodeId) != null)
-              ? rmContext.getRMNodes().get(nodeId).getRackName()
-              : "default-rack";
-      if (useSet) {
-        perAppTagsMapping.removeTags(nodeId, allocationTags);
-        perAppRackTagsMapping.removeTags(nodeRack, allocationTags);
-        globalNodeMapping.removeTags(nodeId, allocationTags);
-        globalRackMapping.removeTags(nodeRack, allocationTags);
-      } else {
-        perAppTagsMapping.removeTag(nodeId, applicationIdTag);
-        perAppRackTagsMapping.removeTag(nodeRack, applicationIdTag);
-        globalNodeMapping.removeTag(nodeId, applicationIdTag);
-        globalRackMapping.removeTag(nodeRack, applicationIdTag);
-      }
+              ? rmContext.getRMNodes().get(nodeId).getRackName() :
+          "default-rack";
+      perAppTagsMapping.removeTags(nodeId, allocationTags);
+      perAppRackTagsMapping.removeTags(nodeRack, allocationTags);
+      globalNodeMapping.removeTags(nodeId, allocationTags);
+      globalRackMapping.removeTags(nodeRack, allocationTags);
 
       if (perAppTagsMapping.isEmpty()) {
         perAppNodeMappings.remove(applicationId);
@@ -602,6 +578,7 @@ public class AllocationTagsManager {
    * @throws InvalidAllocationTagsQueryException when illegal query
    *                                            parameter specified
    */
+  @SuppressWarnings("unchecked")
   public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
       Set<String> tags, LongBinaryOperator op)
       throws InvalidAllocationTagsQueryException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java
deleted file mode 100644
index 43fcfe5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- * /
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
-
-/**
- * Predefined namespaces for tags
- *
- * Same as namespace  of resource types. Namespaces of placement tags are start
- * with alphabets and ended with "/"
- */
-public class AllocationTagsNamespaces {
-  public static final String APP_ID = "yarn_app_id/";
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org