You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by kk...@apache.org on 2018/02/15 22:28:20 UTC

[4/4] hadoop git commit: YARN-7920. Simplify configuration for PlacementConstraints. Contributed by Wangda Tan.

YARN-7920. Simplify configuration for PlacementConstraints. Contributed by Wangda Tan.

(cherry picked from commit 0b489e564ce5a50324a530e29c18aa8a75276c50)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/41708402
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/41708402
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/41708402

Branch: refs/heads/branch-3.1
Commit: 41708402a1c1033d8829aad23db9cfe90de77acd
Parents: 4cdc57f
Author: Konstantinos Karanasos <kk...@apache.org>
Authored: Thu Feb 15 14:23:27 2018 -0800
Committer: Konstantinos Karanasos <kk...@apache.org>
Committed: Thu Feb 15 14:25:34 2018 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  54 ++-
 .../TestAMRMClientPlacementConstraints.java     |   3 +-
 .../src/main/resources/yarn-default.xml         |  10 +-
 .../ApplicationMasterService.java               |  46 ++-
 .../scheduler/capacity/CapacityScheduler.java   |  13 -
 .../CapacitySchedulerConfiguration.java         |   5 -
 .../processor/AbstractPlacementProcessor.java   |  96 +++++
 .../processor/DisabledPlacementProcessor.java   |  77 ++++
 .../processor/PlacementConstraintProcessor.java | 340 +++++++++++++++++
 .../processor/PlacementProcessor.java           | 377 -------------------
 .../processor/SchedulerPlacementProcessor.java  |  55 +++
 ...apacitySchedulerSchedulingRequestUpdate.java |   4 +
 ...estSchedulingRequestContainerAllocation.java |   8 +-
 ...hedulingRequestContainerAllocationAsync.java |   4 +-
 .../scheduler/capacity/TestUtils.java           |   4 +-
 .../constraint/TestPlacementProcessor.java      |  12 +-
 .../src/site/markdown/PlacementConstraints.md   | 136 +++++++
 .../site/markdown/PlacementConstraints.md.vm    | 149 --------
 18 files changed, 818 insertions(+), 575 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 118f9fb..6677478 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -532,11 +532,57 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_SCHEDULER = 
     RM_PREFIX + "scheduler.class";
 
-  /** Enable rich placement constraints. */
-  public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED =
-      RM_PREFIX + "placement-constraints.enabled";
+  /**
+   * Specify which handler will be used to process PlacementConstraints.
+   * For details on PlacementConstraints, please refer to
+   * {@link org.apache.hadoop.yarn.api.resource.PlacementConstraint}
+   */
+  @Private
+  public static final String RM_PLACEMENT_CONSTRAINTS_HANDLER =
+      RM_PREFIX + "placement-constraints.handler";
+
+  /**
+   * This handler rejects all allocate calls made by an application, if they
+   * contain a {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}.
+   */
+  @Private
+  public static final String DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER =
+      "disabled";
 
-  public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = false;
+  /**
+   * Using this handler, the placement of containers with constraints is
+   * determined as a pre-processing step before the capacity or the fair
+   * scheduler is called. Once the placement is decided, the capacity/fair
+   * scheduler is invoked to perform the actual allocation. The advantage of
+   * this approach is that it supports all constraint types (affinity,
+   * anti-affinity, cardinality). Moreover, it considers multiple containers at
+   * a time, which allows to satisfy more constraints than a container-at-a-time
+   * approach can achieve. As it sits outside the main scheduler, it can be used
+   * by both the capacity and fair schedulers. Note that at the moment it does
+   * not account for task priorities within an application, given that such
+   * priorities might be conflicting with the placement constraints.
+   */
+  @Private
+  public static final String PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER =
+      "placement-processor";
+
+  /**
+   * Using this handler, containers with constraints will be placed by the main
+   * scheduler. If the configured RM scheduler
+   * <pre>yarn.resourcemanager.scheduler.class</pre>
+   * cannot handle placement constraints, the corresponding SchedulingRequests
+   * will be rejected. As of now, only the capacity scheduler supports
+   * SchedulingRequests. In particular, it currently supports anti-affinity
+   * constraints (no affinity or cardinality) and places one container at a
+   * time. The advantage of this handler compared to the placement-processor is
+   * that it follows the same ordering rules for queues (sorted by utilization,
+   * priority) and apps (sorted by FIFO/fairness/priority) as the ones followed
+   * by the main scheduler.
+   */
+  @Private
+  public static final String
+      SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER =
+      "scheduler";
 
   /** Placement Algorithm. */
   public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
index fdc8d58..0e88299 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
@@ -65,7 +65,8 @@ public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
     // mismatches between client and server
     teardown();
     conf = new YarnConfiguration();
-    conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
     createClusterAndStartApplication(conf);
 
     AMRMClient<AMRMClient.ContainerRequest> amClient =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 509a040..adf8d8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -131,9 +131,13 @@
   </property>
 
   <property>
-    <description>Enable Constraint Placement.</description>
-    <name>yarn.resourcemanager.placement-constraints.enabled</name>
-    <value>false</value>
+    <description>
+      Specify which handler will be used to process PlacementConstraints.
+      Acceptable values are: `placement-processor`, `scheduler` and `disabled`.
+      For a detailed explanation of these values, please refer to documentation.
+    </description>
+    <name>yarn.resourcemanager.placement-constraints.handler</name>
+    <value>disabled</value>
   </property>
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index aa1177d..ae28879 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementProcessor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@@ -67,6 +66,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AbstractPlacementProcessor;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.DisabledPlacementProcessor;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementConstraintProcessor;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SchedulerPlacementProcessor;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
@@ -118,20 +121,47 @@ public class ApplicationMasterService extends AbstractService implements
     initializeProcessingChain(conf);
   }
 
+  private void addPlacementConstraintHandler(Configuration conf) {
+    String placementConstraintsHandler =
+        conf.get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+            YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    if (placementConstraintsHandler
+        .equals(YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER)) {
+      LOG.info(YarnConfiguration.DISABLED_RM_PLACEMENT_CONSTRAINTS_HANDLER
+          + " placement handler will be used, all scheduling requests will "
+          + "be rejected.");
+      amsProcessingChain.addProcessor(new DisabledPlacementProcessor());
+    } else if (placementConstraintsHandler
+        .equals(YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER)) {
+      LOG.info(YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER
+          + " placement handler will be used. Scheduling requests will be "
+          + "handled by the placement constraint processor");
+      amsProcessingChain.addProcessor(new PlacementConstraintProcessor());
+    } else if (placementConstraintsHandler
+        .equals(YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER)) {
+      LOG.info(YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER
+          + " placement handler will be used. Scheduling requests will be "
+          + "handled by the main scheduler.");
+      amsProcessingChain.addProcessor(new SchedulerPlacementProcessor());
+    }
+  }
+
   private void initializeProcessingChain(Configuration conf) {
     amsProcessingChain.init(rmContext, null);
-    boolean enablePlacementConstraints = conf.getBoolean(
-        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED,
-        YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED);
-    if (enablePlacementConstraints) {
-      amsProcessingChain.addProcessor(new PlacementProcessor());
-    }
+    addPlacementConstraintHandler(conf);
+
     List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf);
     if (processors != null) {
       Collections.reverse(processors);
       for (ApplicationMasterServiceProcessor p : processors) {
         // Ensure only single instance of PlacementProcessor is included
-        if (enablePlacementConstraints && p instanceof PlacementProcessor) {
+        if (p instanceof AbstractPlacementProcessor) {
+          LOG.warn("Found PlacementProcessor=" + p.getClass().getCanonicalName()
+              + " defined in "
+              + YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS
+              + ", however PlacementProcessor handler should be configured "
+              + "by using " + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
+              + ", this processor will be ignored.");
           continue;
         }
         this.amsProcessingChain.addProcessor(p);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index cd9d1373..ddab0c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -1098,18 +1097,6 @@ public class CapacityScheduler extends
       return EMPTY_ALLOCATION;
     }
 
-    if ((!getConfiguration().getBoolean(
-        CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
-        CapacitySchedulerConfiguration.DEFAULT_SCHEDULING_REQUEST_ALLOWED))
-        && schedulingRequests != null && (!schedulingRequests.isEmpty())) {
-      throw new SchedulerInvalidResoureRequestException(
-          "Application attempt:" + applicationAttemptId
-              + " is using SchedulingRequest, which is disabled. Please update "
-              + CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED
-              + " to true in capacity-scheduler.xml in order to use this "
-              + "feature.");
-    }
-
     // The allocate may be the leftover from previous attempt, and it will
     // impact current attempt, such as confuse the request and allocation for
     // current attempt's AM container.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 00733a1..e609be9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -77,11 +77,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   
   @Private
   public static final String PREFIX = "yarn.scheduler.capacity.";
-
-  @Private
-  public static final String SCHEDULING_REQUEST_ALLOWED =
-      PREFIX + "scheduling-request.allowed";
-  public static final boolean DEFAULT_SCHEDULING_REQUEST_ALLOWED = false;
   
   @Private
   public static final String DOT = ".";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java
new file mode 100644
index 0000000..96ae623
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/AbstractPlacementProcessor.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for all PlacementProcessors.
+ */
+public abstract class AbstractPlacementProcessor implements
+    ApplicationMasterServiceProcessor{
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractPlacementProcessor.class);
+
+  protected ApplicationMasterServiceProcessor nextAMSProcessor;
+  protected AbstractYarnScheduler scheduler;
+  private PlacementConstraintManager constraintManager;
+
+  @Override
+  public void init(ApplicationMasterServiceContext amsContext,
+      ApplicationMasterServiceProcessor nextProcessor) {
+    this.nextAMSProcessor = nextProcessor;
+    this.scheduler =
+        (AbstractYarnScheduler) ((RMContextImpl) amsContext).getScheduler();
+    this.constraintManager =
+        ((RMContextImpl)amsContext).getPlacementConstraintManager();
+  }
+
+  @Override
+  public void registerApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      RegisterApplicationMasterRequest request,
+      RegisterApplicationMasterResponse response)
+      throws IOException, YarnException {
+    Map<Set<String>, PlacementConstraint> appPlacementConstraints =
+        request.getPlacementConstraints();
+    processPlacementConstraints(applicationAttemptId.getApplicationId(),
+        appPlacementConstraints);
+    nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request,
+        response);
+  }
+
+  private void processPlacementConstraints(ApplicationId applicationId,
+      Map<Set<String>, PlacementConstraint> appPlacementConstraints) {
+    if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) {
+      LOG.info("Constraints added for application [{}] against tags [{}]",
+          applicationId, appPlacementConstraints);
+      constraintManager.registerApplication(
+          applicationId, appPlacementConstraints);
+    }
+  }
+
+  @Override
+  public void finishApplicationMaster(ApplicationAttemptId applicationAttemptId,
+      FinishApplicationMasterRequest request,
+      FinishApplicationMasterResponse response) {
+    constraintManager.unregisterApplication(
+        applicationAttemptId.getApplicationId());
+    this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId, request,
+        response);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java
new file mode 100644
index 0000000..0d093a7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/DisabledPlacementProcessor.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Processor that reject all SchedulingRequests.
+ */
+public class DisabledPlacementProcessor extends AbstractPlacementProcessor {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DisabledPlacementProcessor.class);
+
+  @Override
+  public void registerApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      RegisterApplicationMasterRequest request,
+      RegisterApplicationMasterResponse response)
+      throws IOException, YarnException {
+    if (request.getPlacementConstraints() != null && !request
+        .getPlacementConstraints().isEmpty()) {
+      String message = "Found non empty placement constraints map in "
+          + "RegisterApplicationMasterRequest for application="
+          + applicationAttemptId.toString() + ", but the configured "
+          + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
+          + " cannot handle placement constraints. Rejecting this "
+          + "registerApplicationMaster operation";
+      LOG.warn(message);
+      throw new YarnException(message);
+    }
+    nextAMSProcessor.registerApplicationMaster(applicationAttemptId, request,
+        response);
+  }
+
+  @Override
+  public void allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse response) throws YarnException {
+    if (request.getSchedulingRequests() != null && !request
+        .getSchedulingRequests().isEmpty()) {
+      String message = "Found non empty SchedulingRequest in "
+          + "AllocateRequest for application="
+          + appAttemptId.toString() + ", but the configured "
+          + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER
+          + " cannot handle placement constraints. Rejecting this "
+          + "allocate operation";
+      LOG.warn(message);
+      throw new YarnException(message);
+    }
+    nextAMSProcessor.allocate(appAttemptId, request, response);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
new file mode 100644
index 0000000..f089a19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
+import org.apache.hadoop.yarn.api.records.RejectionReason;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * An ApplicationMasterServiceProcessor that performs Constrained placement of
+ * Scheduling Requests. It does the following:
+ * 1. All initialization.
+ * 2. Intercepts placement constraints from the register call and adds it to
+ *    the placement constraint manager.
+ * 3. Dispatches Scheduling Requests to the Planner.
+ */
+public class PlacementConstraintProcessor extends AbstractPlacementProcessor {
+
+  /**
+   * Wrapper over the SchedulingResponse that wires in the placement attempt
+   * and last attempted Node.
+   */
+  static final class Response extends SchedulingResponse {
+
+    private final int placementAttempt;
+    private final SchedulerNode attemptedNode;
+
+    private Response(boolean isSuccess, ApplicationId applicationId,
+        SchedulingRequest schedulingRequest, int placementAttempt,
+        SchedulerNode attemptedNode) {
+      super(isSuccess, applicationId, schedulingRequest);
+      this.placementAttempt = placementAttempt;
+      this.attemptedNode = attemptedNode;
+    }
+  }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PlacementConstraintProcessor.class);
+
+  private ExecutorService schedulingThreadPool;
+  private int retryAttempts;
+  private Map<ApplicationId, List<BatchedRequests>> requestsToRetry =
+      new ConcurrentHashMap<>();
+  private Map<ApplicationId, List<SchedulingRequest>> requestsToReject =
+      new ConcurrentHashMap<>();
+
+  private BatchedRequests.IteratorType iteratorType;
+  private PlacementDispatcher placementDispatcher;
+
+
+  @Override
+  public void init(ApplicationMasterServiceContext amsContext,
+      ApplicationMasterServiceProcessor nextProcessor) {
+    LOG.info("Initializing Constraint Placement Processor:");
+    super.init(amsContext, nextProcessor);
+
+    // Only the first class is considered - even if a comma separated
+    // list is provided. (This is for simplicity, since getInstances does a
+    // lot of good things by handling things correctly)
+    List<ConstraintPlacementAlgorithm> instances =
+        ((RMContextImpl) amsContext).getYarnConfiguration().getInstances(
+            YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS,
+            ConstraintPlacementAlgorithm.class);
+    ConstraintPlacementAlgorithm algorithm = null;
+    if (instances != null && !instances.isEmpty()) {
+      algorithm = instances.get(0);
+    } else {
+      algorithm = new DefaultPlacementAlgorithm();
+    }
+    LOG.info("Placement Algorithm [{}]", algorithm.getClass().getName());
+
+    String iteratorName = ((RMContextImpl) amsContext).getYarnConfiguration()
+        .get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR,
+            BatchedRequests.IteratorType.SERIAL.name());
+    LOG.info("Placement Algorithm Iterator[{}]", iteratorName);
+    try {
+      iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName);
+    } catch (IllegalArgumentException e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate Placement Algorithm Iterator: ", e);
+    }
+
+    int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE,
+        YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE);
+    this.placementDispatcher = new PlacementDispatcher();
+    this.placementDispatcher.init(
+        ((RMContextImpl)amsContext), algorithm, algoPSize);
+    LOG.info("Planning Algorithm pool size [{}]", algoPSize);
+
+    int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE,
+        YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE);
+    this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize);
+    LOG.info("Scheduler pool size [{}]", schedPSize);
+
+    // Number of times a request that is not satisfied by the scheduler
+    // can be retried.
+    this.retryAttempts =
+        ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+            YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
+            YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
+    LOG.info("Num retry attempts [{}]", this.retryAttempts);
+  }
+
+  @Override
+  public void allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse response) throws YarnException {
+    // Copy the scheduling request since we will clear it later after sending
+    // to dispatcher
+    List<SchedulingRequest> schedulingRequests =
+        new ArrayList<>(request.getSchedulingRequests());
+    dispatchRequestsForPlacement(appAttemptId, schedulingRequests);
+    reDispatchRetryableRequests(appAttemptId);
+    schedulePlacedRequests(appAttemptId);
+
+    // Remove SchedulingRequest from AllocateRequest to avoid SchedulingRequest
+    // added to scheduler.
+    request.setSchedulingRequests(Collections.emptyList());
+
+    nextAMSProcessor.allocate(appAttemptId, request, response);
+
+    handleRejectedRequests(appAttemptId, response);
+  }
+
+  private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
+      List<SchedulingRequest> schedulingRequests) {
+    if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
+      // Normalize the Requests before dispatching
+      schedulingRequests.forEach(req -> {
+        Resource reqResource = req.getResourceSizing().getResources();
+        req.getResourceSizing()
+            .setResources(this.scheduler.getNormalizedResource(reqResource));
+      });
+      this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
+          appAttemptId.getApplicationId(), schedulingRequests, 1));
+    }
+  }
+
+  private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) {
+    List<BatchedRequests> reqsToRetry =
+        this.requestsToRetry.get(appAttId.getApplicationId());
+    if (reqsToRetry != null && !reqsToRetry.isEmpty()) {
+      synchronized (reqsToRetry) {
+        for (BatchedRequests bReq: reqsToRetry) {
+          this.placementDispatcher.dispatch(bReq);
+        }
+        reqsToRetry.clear();
+      }
+    }
+  }
+
+  private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) {
+    ApplicationId applicationId = appAttemptId.getApplicationId();
+    List<PlacedSchedulingRequest> placedSchedulingRequests =
+        this.placementDispatcher.pullPlacedRequests(applicationId);
+    for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) {
+      SchedulingRequest sReq = placedReq.getSchedulingRequest();
+      for (SchedulerNode node : placedReq.getNodes()) {
+        final SchedulingRequest sReqClone =
+            SchedulingRequest.newInstance(sReq.getAllocationRequestId(),
+                sReq.getPriority(), sReq.getExecutionType(),
+                sReq.getAllocationTags(),
+                ResourceSizing.newInstance(
+                    sReq.getResourceSizing().getResources()),
+                sReq.getPlacementConstraint());
+        SchedulerApplicationAttempt applicationAttempt =
+            this.scheduler.getApplicationAttempt(appAttemptId);
+        Runnable task = () -> {
+          boolean success =
+              scheduler.attemptAllocationOnNode(
+                  applicationAttempt, sReqClone, node);
+          if (!success) {
+            LOG.warn("Unsuccessful allocation attempt [{}] for [{}]",
+                placedReq.getPlacementAttempt(), sReqClone);
+          }
+          handleSchedulingResponse(
+              new Response(success, applicationId, sReqClone,
+              placedReq.getPlacementAttempt(), node));
+        };
+        this.schedulingThreadPool.submit(task);
+      }
+    }
+  }
+
+  private void handleRejectedRequests(ApplicationAttemptId appAttemptId,
+      AllocateResponse response) {
+    List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests =
+        this.placementDispatcher.pullRejectedRequests(
+            appAttemptId.getApplicationId());
+    if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) {
+      LOG.warn("Following requests of [{}] were rejected by" +
+              " the PlacementAlgorithmOutput Algorithm: {}",
+          appAttemptId.getApplicationId(), rejectedAlgoRequests);
+      rejectedAlgoRequests.stream()
+          .filter(req -> req.getPlacementAttempt() < retryAttempts)
+          .forEach(req -> handleSchedulingResponse(
+              new Response(false, appAttemptId.getApplicationId(),
+                  req.getSchedulingRequest(), req.getPlacementAttempt(),
+                  null)));
+      ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
+          rejectedAlgoRequests.stream()
+              .filter(req -> req.getPlacementAttempt() >= retryAttempts)
+              .map(sr -> RejectedSchedulingRequest.newInstance(
+                  RejectionReason.COULD_NOT_PLACE_ON_NODE,
+                  sr.getSchedulingRequest()))
+              .collect(Collectors.toList()));
+    }
+    List<SchedulingRequest> rejectedRequests =
+        this.requestsToReject.get(appAttemptId.getApplicationId());
+    if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
+      synchronized (rejectedRequests) {
+        LOG.warn("Following requests of [{}] exhausted all retry attempts " +
+                "trying to schedule on placed node: {}",
+            appAttemptId.getApplicationId(), rejectedRequests);
+        ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
+            rejectedRequests.stream()
+                .map(sr -> RejectedSchedulingRequest.newInstance(
+                    RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, sr))
+                .collect(Collectors.toList()));
+        rejectedRequests.clear();
+      }
+    }
+  }
+
+  @Override
+  public void finishApplicationMaster(ApplicationAttemptId appAttemptId,
+      FinishApplicationMasterRequest request,
+      FinishApplicationMasterResponse response) {
+    placementDispatcher.clearApplicationState(appAttemptId.getApplicationId());
+    requestsToReject.remove(appAttemptId.getApplicationId());
+    requestsToRetry.remove(appAttemptId.getApplicationId());
+    super.finishApplicationMaster(appAttemptId, request, response);
+  }
+
+  private void handleSchedulingResponse(SchedulingResponse schedulerResponse) {
+    int placementAttempt = ((Response)schedulerResponse).placementAttempt;
+    // Retry this placement as it is not successful and we are still
+    // under max retry. The req is batched with other unsuccessful
+    // requests from the same app
+    if (!schedulerResponse.isSuccess() && placementAttempt < retryAttempts) {
+      List<BatchedRequests> reqsToRetry =
+          requestsToRetry.computeIfAbsent(
+              schedulerResponse.getApplicationId(),
+              k -> new ArrayList<>());
+      synchronized (reqsToRetry) {
+        addToRetryList(schedulerResponse, placementAttempt, reqsToRetry);
+      }
+      LOG.warn("Going to retry request for application [{}] after [{}]" +
+              " attempts: [{}]", schedulerResponse.getApplicationId(),
+          placementAttempt, schedulerResponse.getSchedulingRequest());
+    } else {
+      if (!schedulerResponse.isSuccess()) {
+        LOG.warn("Not retrying request for application [{}] after [{}]" +
+                " attempts: [{}]", schedulerResponse.getApplicationId(),
+            placementAttempt, schedulerResponse.getSchedulingRequest());
+        List<SchedulingRequest> reqsToReject =
+            requestsToReject.computeIfAbsent(
+                schedulerResponse.getApplicationId(),
+                k -> new ArrayList<>());
+        synchronized (reqsToReject) {
+          reqsToReject.add(schedulerResponse.getSchedulingRequest());
+        }
+      }
+    }
+  }
+
+  private void addToRetryList(SchedulingResponse schedulerResponse,
+      int placementAttempt, List<BatchedRequests> reqsToRetry) {
+    boolean isAdded = false;
+    for (BatchedRequests br : reqsToRetry) {
+      if (br.getPlacementAttempt() == placementAttempt + 1) {
+        br.addToBatch(schedulerResponse.getSchedulingRequest());
+        br.addToBlacklist(
+            schedulerResponse.getSchedulingRequest().getAllocationTags(),
+            ((Response) schedulerResponse).attemptedNode);
+        isAdded = true;
+        break;
+      }
+    }
+    if (!isAdded) {
+      BatchedRequests br = new BatchedRequests(iteratorType,
+          schedulerResponse.getApplicationId(),
+          Collections.singleton(schedulerResponse.getSchedulingRequest()),
+          placementAttempt + 1);
+      reqsToRetry.add(br);
+      br.addToBlacklist(
+          schedulerResponse.getSchedulingRequest().getAllocationTags(),
+          ((Response) schedulerResponse).attemptedNode);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
deleted file mode 100644
index 9ce38f4..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
-
-import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
-import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
-import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
-import org.apache.hadoop.yarn.api.records.RejectionReason;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceSizing;
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
-
-/**
- * An ApplicationMasterService Processor that performs Constrained placement of
- * Scheduling Requests. It does the following:
- * 1. All initialization.
- * 2. Intercepts placement constraints from the register call and adds it to
- *    the placement constraint manager.
- * 3. Dispatches Scheduling Requests to the Planner.
- */
-public class PlacementProcessor implements ApplicationMasterServiceProcessor {
-
-  /**
-   * Wrapper over the SchedulingResponse that wires in the placement attempt
-   * and last attempted Node.
-   */
-  static final class Response extends SchedulingResponse {
-
-    private final int placementAttempt;
-    private final SchedulerNode attemptedNode;
-
-    private Response(boolean isSuccess, ApplicationId applicationId,
-        SchedulingRequest schedulingRequest, int placementAttempt,
-        SchedulerNode attemptedNode) {
-      super(isSuccess, applicationId, schedulingRequest);
-      this.placementAttempt = placementAttempt;
-      this.attemptedNode = attemptedNode;
-    }
-  }
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PlacementProcessor.class);
-  private PlacementConstraintManager constraintManager;
-  private ApplicationMasterServiceProcessor nextAMSProcessor;
-
-  private AbstractYarnScheduler scheduler;
-  private ExecutorService schedulingThreadPool;
-  private int retryAttempts;
-  private Map<ApplicationId, List<BatchedRequests>> requestsToRetry =
-      new ConcurrentHashMap<>();
-  private Map<ApplicationId, List<SchedulingRequest>> requestsToReject =
-      new ConcurrentHashMap<>();
-
-  private BatchedRequests.IteratorType iteratorType;
-  private PlacementDispatcher placementDispatcher;
-
-
-  @Override
-  public void init(ApplicationMasterServiceContext amsContext,
-      ApplicationMasterServiceProcessor nextProcessor) {
-    LOG.info("Initializing Constraint Placement Processor:");
-    this.nextAMSProcessor = nextProcessor;
-    this.constraintManager =
-        ((RMContextImpl)amsContext).getPlacementConstraintManager();
-
-    this.scheduler =
-        (AbstractYarnScheduler)((RMContextImpl)amsContext).getScheduler();
-    // Only the first class is considered - even if a comma separated
-    // list is provided. (This is for simplicity, since getInstances does a
-    // lot of good things by handling things correctly)
-    List<ConstraintPlacementAlgorithm> instances =
-        ((RMContextImpl) amsContext).getYarnConfiguration().getInstances(
-            YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS,
-            ConstraintPlacementAlgorithm.class);
-    ConstraintPlacementAlgorithm algorithm = null;
-    if (instances != null && !instances.isEmpty()) {
-      algorithm = instances.get(0);
-    } else {
-      algorithm = new DefaultPlacementAlgorithm();
-    }
-    LOG.info("Placement Algorithm [{}]", algorithm.getClass().getName());
-
-    String iteratorName = ((RMContextImpl) amsContext).getYarnConfiguration()
-        .get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR,
-            BatchedRequests.IteratorType.SERIAL.name());
-    LOG.info("Placement Algorithm Iterator[{}]", iteratorName);
-    try {
-      iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName);
-    } catch (IllegalArgumentException e) {
-      throw new YarnRuntimeException(
-          "Could not instantiate Placement Algorithm Iterator: ", e);
-    }
-
-    int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
-        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE,
-        YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE);
-    this.placementDispatcher = new PlacementDispatcher();
-    this.placementDispatcher.init(
-        ((RMContextImpl)amsContext), algorithm, algoPSize);
-    LOG.info("Planning Algorithm pool size [{}]", algoPSize);
-
-    int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
-        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE,
-        YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE);
-    this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize);
-    LOG.info("Scheduler pool size [{}]", schedPSize);
-
-    // Number of times a request that is not satisfied by the scheduler
-    // can be retried.
-    this.retryAttempts =
-        ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
-            YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
-            YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
-    LOG.info("Num retry attempts [{}]", this.retryAttempts);
-  }
-
-  @Override
-  public void registerApplicationMaster(ApplicationAttemptId appAttemptId,
-      RegisterApplicationMasterRequest request,
-      RegisterApplicationMasterResponse response)
-      throws IOException, YarnException {
-    Map<Set<String>, PlacementConstraint> appPlacementConstraints =
-        request.getPlacementConstraints();
-    processPlacementConstraints(
-        appAttemptId.getApplicationId(), appPlacementConstraints);
-    nextAMSProcessor.registerApplicationMaster(appAttemptId, request, response);
-  }
-
-  private void processPlacementConstraints(ApplicationId applicationId,
-      Map<Set<String>, PlacementConstraint> appPlacementConstraints) {
-    if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) {
-      LOG.info("Constraints added for application [{}] against tags [{}]",
-          applicationId, appPlacementConstraints);
-      constraintManager.registerApplication(
-          applicationId, appPlacementConstraints);
-    }
-  }
-
-  @Override
-  public void allocate(ApplicationAttemptId appAttemptId,
-      AllocateRequest request, AllocateResponse response) throws YarnException {
-    // Copy the scheduling request since we will clear it later after sending
-    // to dispatcher
-    List<SchedulingRequest> schedulingRequests =
-        new ArrayList<>(request.getSchedulingRequests());
-    dispatchRequestsForPlacement(appAttemptId, schedulingRequests);
-    reDispatchRetryableRequests(appAttemptId);
-    schedulePlacedRequests(appAttemptId);
-
-    // Remove SchedulingRequest from AllocateRequest to avoid SchedulingRequest
-    // added to scheduler.
-    request.setSchedulingRequests(Collections.emptyList());
-
-    nextAMSProcessor.allocate(appAttemptId, request, response);
-
-    handleRejectedRequests(appAttemptId, response);
-  }
-
-  private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
-      List<SchedulingRequest> schedulingRequests) {
-    if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
-      // Normalize the Requests before dispatching
-      schedulingRequests.forEach(req -> {
-        Resource reqResource = req.getResourceSizing().getResources();
-        req.getResourceSizing()
-            .setResources(this.scheduler.getNormalizedResource(reqResource));
-      });
-      this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
-          appAttemptId.getApplicationId(), schedulingRequests, 1));
-    }
-  }
-
-  private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) {
-    List<BatchedRequests> reqsToRetry =
-        this.requestsToRetry.get(appAttId.getApplicationId());
-    if (reqsToRetry != null && !reqsToRetry.isEmpty()) {
-      synchronized (reqsToRetry) {
-        for (BatchedRequests bReq: reqsToRetry) {
-          this.placementDispatcher.dispatch(bReq);
-        }
-        reqsToRetry.clear();
-      }
-    }
-  }
-
-  private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) {
-    ApplicationId applicationId = appAttemptId.getApplicationId();
-    List<PlacedSchedulingRequest> placedSchedulingRequests =
-        this.placementDispatcher.pullPlacedRequests(applicationId);
-    for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) {
-      SchedulingRequest sReq = placedReq.getSchedulingRequest();
-      for (SchedulerNode node : placedReq.getNodes()) {
-        final SchedulingRequest sReqClone =
-            SchedulingRequest.newInstance(sReq.getAllocationRequestId(),
-                sReq.getPriority(), sReq.getExecutionType(),
-                sReq.getAllocationTags(),
-                ResourceSizing.newInstance(
-                    sReq.getResourceSizing().getResources()),
-                sReq.getPlacementConstraint());
-        SchedulerApplicationAttempt applicationAttempt =
-            this.scheduler.getApplicationAttempt(appAttemptId);
-        Runnable task = () -> {
-          boolean success =
-              scheduler.attemptAllocationOnNode(
-                  applicationAttempt, sReqClone, node);
-          if (!success) {
-            LOG.warn("Unsuccessful allocation attempt [{}] for [{}]",
-                placedReq.getPlacementAttempt(), sReqClone);
-          }
-          handleSchedulingResponse(
-              new Response(success, applicationId, sReqClone,
-              placedReq.getPlacementAttempt(), node));
-        };
-        this.schedulingThreadPool.submit(task);
-      }
-    }
-  }
-
-  private void handleRejectedRequests(ApplicationAttemptId appAttemptId,
-      AllocateResponse response) {
-    List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests =
-        this.placementDispatcher.pullRejectedRequests(
-            appAttemptId.getApplicationId());
-    if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) {
-      LOG.warn("Following requests of [{}] were rejected by" +
-              " the PlacementAlgorithmOutput Algorithm: {}",
-          appAttemptId.getApplicationId(), rejectedAlgoRequests);
-      rejectedAlgoRequests.stream()
-          .filter(req -> req.getPlacementAttempt() < retryAttempts)
-          .forEach(req -> handleSchedulingResponse(
-              new Response(false, appAttemptId.getApplicationId(),
-                  req.getSchedulingRequest(), req.getPlacementAttempt(),
-                  null)));
-      ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
-          rejectedAlgoRequests.stream()
-              .filter(req -> req.getPlacementAttempt() >= retryAttempts)
-              .map(sr -> RejectedSchedulingRequest.newInstance(
-                  RejectionReason.COULD_NOT_PLACE_ON_NODE,
-                  sr.getSchedulingRequest()))
-              .collect(Collectors.toList()));
-    }
-    List<SchedulingRequest> rejectedRequests =
-        this.requestsToReject.get(appAttemptId.getApplicationId());
-    if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
-      synchronized (rejectedRequests) {
-        LOG.warn("Following requests of [{}] exhausted all retry attempts " +
-                "trying to schedule on placed node: {}",
-            appAttemptId.getApplicationId(), rejectedRequests);
-        ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
-            rejectedRequests.stream()
-                .map(sr -> RejectedSchedulingRequest.newInstance(
-                    RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, sr))
-                .collect(Collectors.toList()));
-        rejectedRequests.clear();
-      }
-    }
-  }
-
-  @Override
-  public void finishApplicationMaster(ApplicationAttemptId appAttemptId,
-      FinishApplicationMasterRequest request,
-      FinishApplicationMasterResponse response) {
-    constraintManager.unregisterApplication(appAttemptId.getApplicationId());
-    placementDispatcher.clearApplicationState(appAttemptId.getApplicationId());
-    requestsToReject.remove(appAttemptId.getApplicationId());
-    requestsToRetry.remove(appAttemptId.getApplicationId());
-    nextAMSProcessor.finishApplicationMaster(appAttemptId, request, response);
-  }
-
-  private void handleSchedulingResponse(SchedulingResponse schedulerResponse) {
-    int placementAttempt = ((Response)schedulerResponse).placementAttempt;
-    // Retry this placement as it is not successful and we are still
-    // under max retry. The req is batched with other unsuccessful
-    // requests from the same app
-    if (!schedulerResponse.isSuccess() && placementAttempt < retryAttempts) {
-      List<BatchedRequests> reqsToRetry =
-          requestsToRetry.computeIfAbsent(
-              schedulerResponse.getApplicationId(),
-              k -> new ArrayList<>());
-      synchronized (reqsToRetry) {
-        addToRetryList(schedulerResponse, placementAttempt, reqsToRetry);
-      }
-      LOG.warn("Going to retry request for application [{}] after [{}]" +
-              " attempts: [{}]", schedulerResponse.getApplicationId(),
-          placementAttempt, schedulerResponse.getSchedulingRequest());
-    } else {
-      if (!schedulerResponse.isSuccess()) {
-        LOG.warn("Not retrying request for application [{}] after [{}]" +
-                " attempts: [{}]", schedulerResponse.getApplicationId(),
-            placementAttempt, schedulerResponse.getSchedulingRequest());
-        List<SchedulingRequest> reqsToReject =
-            requestsToReject.computeIfAbsent(
-                schedulerResponse.getApplicationId(),
-                k -> new ArrayList<>());
-        synchronized (reqsToReject) {
-          reqsToReject.add(schedulerResponse.getSchedulingRequest());
-        }
-      }
-    }
-  }
-
-  private void addToRetryList(SchedulingResponse schedulerResponse,
-      int placementAttempt, List<BatchedRequests> reqsToRetry) {
-    boolean isAdded = false;
-    for (BatchedRequests br : reqsToRetry) {
-      if (br.getPlacementAttempt() == placementAttempt + 1) {
-        br.addToBatch(schedulerResponse.getSchedulingRequest());
-        br.addToBlacklist(
-            schedulerResponse.getSchedulingRequest().getAllocationTags(),
-            ((Response) schedulerResponse).attemptedNode);
-        isAdded = true;
-        break;
-      }
-    }
-    if (!isAdded) {
-      BatchedRequests br = new BatchedRequests(iteratorType,
-          schedulerResponse.getApplicationId(),
-          Collections.singleton(schedulerResponse.getSchedulingRequest()),
-          placementAttempt + 1);
-      reqsToRetry.add(br);
-      br.addToBlacklist(
-          schedulerResponse.getSchedulingRequest().getAllocationTags(),
-          ((Response) schedulerResponse).attemptedNode);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java
new file mode 100644
index 0000000..5332e34
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SchedulerPlacementProcessor.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Forwarding SchedulingRequests to be handled by the scheduler, as long as the
+ * scheduler supports SchedulingRequests.
+ */
+public class SchedulerPlacementProcessor extends AbstractPlacementProcessor {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SchedulerPlacementProcessor.class);
+
+  @Override
+  public void allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse response) throws YarnException {
+    if (request.getSchedulingRequests() != null
+        && !request.getSchedulingRequests().isEmpty()) {
+      if (!(scheduler instanceof CapacityScheduler)) {
+        String message = "Found non empty SchedulingRequest of "
+            + "AllocateRequest for application=" + appAttemptId.toString()
+            + ", however the configured scheduler="
+            + scheduler.getClass().getCanonicalName()
+            + " cannot handle placement constraints, rejecting this "
+            + "allocate operation";
+        LOG.warn(message);
+        throw new YarnException(message);
+      }
+    }
+    nextAMSProcessor.allocate(appAttemptId, request, response);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
index 484d780..ee7e013 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSchedulingRequestUpdate.java
@@ -50,6 +50,8 @@ public class TestCapacitySchedulerSchedulingRequestUpdate
     Configuration conf = TestUtils.getConfigurationWithQueueLabels(
         new Configuration(false));
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
 
     final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
     mgr.init(conf);
@@ -166,6 +168,8 @@ public class TestCapacitySchedulerSchedulingRequestUpdate
     Configuration conf = TestUtils.getConfigurationWithQueueLabels(
         new Configuration(false));
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
 
     final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
     mgr.init(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
index b297f79..27d8661 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
@@ -58,8 +58,8 @@ public class TestSchedulingRequestContainerAllocation {
   public void testIntraAppAntiAffinity() throws Exception {
     Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
         new Configuration());
-    csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
-        true);
+    csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
 
     // inject node label manager
     MockRM rm1 = new MockRM(csConf) {
@@ -141,8 +141,8 @@ public class TestSchedulingRequestContainerAllocation {
   public void testIntraAppAntiAffinityWithMultipleTags() throws Exception {
     Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
         new Configuration());
-    csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
-        true);
+    csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
 
     // inject node label manager
     MockRM rm1 = new MockRM(csConf) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
index fc1cb0d..d1d05dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocationAsync.java
@@ -57,13 +57,13 @@ public class TestSchedulingRequestContainerAllocationAsync {
   private void testIntraAppAntiAffinityAsync(int numThreads) throws Exception {
     Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
         new Configuration());
-    csConf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
-        true);
     csConf.setInt(
         CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
         numThreads);
     csConf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
         + ".scheduling-interval-ms", 0);
+    csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
 
     // inject node label manager
     MockRM rm1 = new MockRM(csConf) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 7180e24..fae63be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -275,9 +275,7 @@ public class TestUtils {
   public static Configuration getConfigurationWithQueueLabels(Configuration config) {
     CapacitySchedulerConfiguration conf =
         new CapacitySchedulerConfiguration(config);
-    conf.setBoolean(CapacitySchedulerConfiguration.SCHEDULING_REQUEST_ALLOWED,
-        true);
-    
+
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
     conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/41708402/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index c4c0b5d..e129a75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -86,8 +86,8 @@ public class TestPlacementProcessor {
     YarnConfiguration conf = new YarnConfiguration(csConf);
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
         ResourceScheduler.class);
-    conf.setBoolean(
-        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
     conf.setInt(
         YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1);
     startRM(conf);
@@ -381,8 +381,8 @@ public class TestPlacementProcessor {
     YarnConfiguration conf = new YarnConfiguration(csConf);
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
         ResourceScheduler.class);
-    conf.setBoolean(
-        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
     startRM(conf);
 
     HashMap<NodeId, MockNM> nodes = new HashMap<>();
@@ -533,8 +533,8 @@ public class TestPlacementProcessor {
     YarnConfiguration conf = new YarnConfiguration(csConf);
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
         ResourceScheduler.class);
-    conf.setBoolean(
-        YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
     conf.setInt(
         YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 2);
     startRM(conf);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org