You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2019/09/24 20:52:50 UTC

[hadoop] branch branch-2 updated: YARN-9730. Support forcing configured partitions to be exclusive based on app node label

This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new eedbf9d  YARN-9730. Support forcing configured partitions to be exclusive based on app node label
eedbf9d is described below

commit eedbf9d195f9270d3b6128a638c8eaf1a005aa78
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Tue Sep 24 12:13:29 2019 -0700

    YARN-9730. Support forcing configured partitions to be exclusive based on app node label
    
    (cherry picked from commit 73a044a63822303f792183244e25432528ecfb1e)
    (cherry picked from commit dd094d79023f6598e47146166aa8c213e03d41b7)
    (cherry picked from commit 10bdcb6f1da3b86146efa479c0bbc8d1da505789)
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |   6 +
 .../src/main/resources/yarn-default.xml            |   9 +
 .../resourcemanager/DefaultAMSProcessor.java       |   6 +
 .../yarn/server/resourcemanager/RMAppManager.java  |   6 +
 .../yarn/server/resourcemanager/RMContext.java     |   4 +
 .../yarn/server/resourcemanager/RMContextImpl.java |  15 ++
 .../scheduler/SchedulerApplicationAttempt.java     |   9 +
 .../resourcemanager/scheduler/SchedulerUtils.java  |  23 ++
 .../capacity/CapacitySchedulerConfiguration.java   |   7 +
 .../scheduler/capacity/LeafQueue.java              |   8 +-
 .../policy/AbstractComparatorOrderingPolicy.java   |   8 +-
 .../FifoOrderingPolicyWithExclusivePartitions.java | 144 ++++++++++++
 ...chedulableEntity.java => IteratorSelector.java} |  48 ++--
 .../scheduler/policy/OrderingPolicy.java           |   3 +-
 .../scheduler/policy/SchedulableEntity.java        |   5 +
 .../scheduler/TestSchedulerUtils.java              | 142 ++++++++++++
 .../scheduler/capacity/TestCapacityScheduler.java  |  11 +-
 .../scheduler/capacity/TestLeafQueue.java          | 143 +++++++++++-
 .../scheduler/policy/MockSchedulableEntity.java    |  15 +-
 .../scheduler/policy/TestFairOrderingPolicy.java   |  12 +-
 .../scheduler/policy/TestFifoOrderingPolicy.java   |   2 +-
 .../TestFifoOrderingPolicyForPendingApps.java      |   5 +-
 ...tFifoOrderingPolicyWithExclusivePartitions.java | 244 +++++++++++++++++++++
 23 files changed, 820 insertions(+), 55 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 76d4500..7139818 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3207,6 +3207,12 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
       CENTRALIZED_NODELABEL_CONFIGURATION_TYPE;
 
+  public static final String EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX
+      = "exclusive-enforced-partitions";
+
+  public static final String EXCLUSIVE_ENFORCED_PARTITIONS = NODE_LABELS_PREFIX
+      + EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX;
+
   public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY =
       YARN_PREFIX + "cluster.max-application-priority";
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 6574016..0c26b4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3557,4 +3557,13 @@
     <value>60000</value>
   </property>
 
+  <property>
+    <description>
+      Comma-separated list of partitions. If a label P is in this list,
+      then the RM will enforce that an app has resource requests with label
+      P iff that app's node label expression is P.
+    </description>
+    <name>yarn.node-labels.exclusive-enforced-partitions</name>
+    <value></value>
+  </property>
 </configuration>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index 0baf17a..65bbaca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler
     .SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -206,6 +207,11 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
           && ResourceRequest.ANY.equals(req.getResourceName())) {
         req.setNodeLabelExpression(asc.getNodeLabelExpression());
       }
+      if (ResourceRequest.ANY.equals(req.getResourceName())) {
+        SchedulerUtils.enforcePartitionExclusivity(req,
+            getRmContext().getExclusiveEnforcedPartitions(),
+            asc.getNodeLabelExpression());
+      }
     }
 
     Resource maximumCapacity =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 84a5383..5b50623 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -90,6 +91,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
   private final ApplicationACLsManager applicationACLsManager;
   private Configuration conf;
   private YarnAuthorizationProvider authorizer;
+  private Set<String> exclusiveEnforcedPartitions;
 
   public RMAppManager(RMContext context,
       YarnScheduler scheduler, ApplicationMasterService masterService,
@@ -110,6 +112,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
     }
     this.authorizer = YarnAuthorizationProvider.getInstance(conf);
+    this.exclusiveEnforcedPartitions = context.getExclusiveEnforcedPartitions();
   }
 
   /**
@@ -490,6 +493,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
           throw new InvalidResourceRequestException("Invalid resource request, "
               + "no resource request specified with " + ResourceRequest.ANY);
         }
+        SchedulerUtils.enforcePartitionExclusivity(anyReq,
+            exclusiveEnforcedPartitions,
+            submissionContext.getNodeLabelExpression());
 
         // Make sure that all of the requests agree with the ANY request
         // and have correct values
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index b255a30..b5cc5a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.nio.ByteBuffer;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.conf.Configuration;
@@ -161,4 +162,7 @@ public interface RMContext extends ApplicationMasterServiceContext {
   ResourceManager getResourceManager();
 
   String getAppProxyUrl(Configuration conf, ApplicationId applicationId);
+
+  Set<String> getExclusiveEnforcedPartitions();
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index d7c624d..ee9aa0d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
@@ -33,6 +35,7 @@ import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.ConfigurationProvider;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
@@ -582,4 +585,16 @@ public class RMContextImpl implements RMContext {
       return UNAVAILABLE;
     }
   }
+
+  public Set<String> getExclusiveEnforcedPartitions() {
+    String[] configuredPartitions = getYarnConfiguration().getStrings(
+        YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS);
+    Set<String> exclusiveEnforcedPartitions = new HashSet<>();
+    if (configuredPartitions != null) {
+      for (String partition : configuredPartitions) {
+        exclusiveEnforcedPartitions.add(partition);
+      }
+    }
+    return exclusiveEnforcedPartitions;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 409f184..0685f5d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -199,6 +199,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   private AtomicLong unconfirmedAllocatedMem = new AtomicLong();
   private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger();
 
+  private String nodeLabelExpression;
+
   public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, AbstractUsersManager abstractUsersManager,
       RMContext rmContext) {
@@ -223,6 +225,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
         unmanagedAM = appSubmissionContext.getUnmanagedAM();
         this.logAggregationContext =
             appSubmissionContext.getLogAggregationContext();
+        this.nodeLabelExpression =
+            appSubmissionContext.getNodeLabelExpression();
       }
     }
 
@@ -1388,4 +1392,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       return diagnosticMessage;
     }
   }
+
+  @Override
+  public String getPartition() {
+    return nodeLabelExpression == null ? "" : nodeLabelExpression;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 041a762..f8fe979 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -258,6 +258,29 @@ public class SchedulerUtils {
   }
 
   /**
+   * If RM should enforce partition exclusivity for enforced partition "x":
+   * 1) If request is "x" and app label is not "x",
+   *    override request to app's label.
+   * 2) If app label is "x", ensure request is "x".
+   * @param resReq resource request
+   * @param enforcedPartitions list of exclusive enforced partitions
+   * @param appLabel app's node label expression
+   */
+  public static void enforcePartitionExclusivity(ResourceRequest resReq,
+      Set<String> enforcedPartitions, String appLabel) {
+    if (enforcedPartitions == null || enforcedPartitions.isEmpty()) {
+      return;
+    }
+    if (!enforcedPartitions.contains(appLabel)
+        && enforcedPartitions.contains(resReq.getNodeLabelExpression())) {
+      resReq.setNodeLabelExpression(appLabel);
+    }
+    if (enforcedPartitions.contains(appLabel)) {
+      resReq.setNodeLabelExpression(appLabel);
+    }
+  }
+
+  /**
    * Utility method to validate a resource request, by insuring that the
    * requested memory/vcore is non-negative and not greater than max
    * 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 0b9d9bf..29fb253 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.P
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -152,6 +153,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
 
   public static final String FAIR_APP_ORDERING_POLICY = "fair";
 
+  public static final String FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY
+      = "fifo-with-partitions";
+
   public static final String DEFAULT_APP_ORDERING_POLICY =
       FIFO_APP_ORDERING_POLICY;
   
@@ -469,6 +473,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) {
        policyType = FairOrderingPolicy.class.getName();
     }
+    if (policyType.trim().equals(FIFO_WITH_PARTITIONS_APP_ORDERING_POLICY)) {
+      policyType = FifoOrderingPolicyWithExclusivePartitions.class.getName();
+    }
     try {
       orderingPolicy = (OrderingPolicy<S>)
         Class.forName(policyType).newInstance();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 993f089..546c334 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
@@ -787,7 +788,8 @@ public class LeafQueue extends AbstractCSQueue {
       }
 
       for (Iterator<FiCaSchedulerApp> fsApp =
-           getPendingAppsOrderingPolicy().getAssignmentIterator();
+           getPendingAppsOrderingPolicy()
+               .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
            fsApp.hasNext(); ) {
         FiCaSchedulerApp application = fsApp.next();
         ApplicationId applicationId = application.getApplicationId();
@@ -1075,8 +1077,10 @@ public class LeafQueue extends AbstractCSQueue {
 
     Map<String, CachedUserLimit> userLimits = new HashMap<>();
     boolean needAssignToQueueCheck = true;
+    IteratorSelector sel = new IteratorSelector();
+    sel.setPartition(ps.getPartition());
     for (Iterator<FiCaSchedulerApp> assignmentIterator =
-         orderingPolicy.getAssignmentIterator();
+         orderingPolicy.getAssignmentIterator(sel);
          assignmentIterator.hasNext(); ) {
       FiCaSchedulerApp application = assignmentIterator.next();
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
index 09dd3bf..9f40d66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
@@ -46,13 +46,13 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
   public Collection<S> getSchedulableEntities() {
     return schedulableEntities;
   }
-  
+
   @Override
-  public Iterator<S> getAssignmentIterator() {
+  public Iterator<S> getAssignmentIterator(IteratorSelector sel) {
     reorderScheduleEntities();
     return schedulableEntities.iterator();
   }
-  
+
   @Override
   public Iterator<S> getPreemptionIterator() {
     reorderScheduleEntities();
@@ -137,5 +137,5 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
 
   @Override
   public abstract String getInfo();
-  
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java
new file mode 100644
index 0000000..1b1ef66
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyWithExclusivePartitions.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+
+/**
+ * Similar to {@link FifoOrderingPolicy}, but with separate ordering policies
+ * for each partition in
+ * {@code yarn.scheduler.capacity.<queue-path>.ordering-policy.exclusive-enforced-partitions}.
+ */
+public class FifoOrderingPolicyWithExclusivePartitions<S extends SchedulableEntity>
+    implements OrderingPolicy<S> {
+
+  private static final String DEFAULT_PARTITION = "DEFAULT_PARTITION";
+
+  private Map<String, OrderingPolicy<S>> orderingPolicies;
+
+  public FifoOrderingPolicyWithExclusivePartitions() {
+    this.orderingPolicies = new HashMap<>();
+    this.orderingPolicies.put(DEFAULT_PARTITION, new FifoOrderingPolicy());
+  }
+
+  public Collection<S> getSchedulableEntities() {
+    return unionOrderingPolicies().getSchedulableEntities();
+  }
+
+  public Iterator<S> getAssignmentIterator(IteratorSelector sel) {
+    // Return schedulable entities only from filtered partition
+    return getPartitionOrderingPolicy(sel.getPartition())
+        .getAssignmentIterator(sel);
+  }
+
+  public Iterator<S> getPreemptionIterator() {
+    // Entities from all partitions should be preemptible
+    return unionOrderingPolicies().getPreemptionIterator();
+  }
+
+  /**
+   * Union all schedulable entities from all ordering policies.
+   * @return ordering policy containing all schedulable entities
+   */
+  private OrderingPolicy<S> unionOrderingPolicies() {
+    OrderingPolicy<S> ret = new FifoOrderingPolicy<>();
+    for (Map.Entry<String, OrderingPolicy<S>> entry
+        : orderingPolicies.entrySet()) {
+      ret.addAllSchedulableEntities(entry.getValue().getSchedulableEntities());
+    }
+    return ret;
+  }
+
+  public void addSchedulableEntity(S s) {
+    getPartitionOrderingPolicy(s.getPartition()).addSchedulableEntity(s);
+  }
+
+  public boolean removeSchedulableEntity(S s) {
+    return getPartitionOrderingPolicy(s.getPartition())
+        .removeSchedulableEntity(s);
+  }
+
+  public void addAllSchedulableEntities(Collection<S> sc) {
+    for (S entity : sc) {
+      getPartitionOrderingPolicy(entity.getPartition())
+          .addSchedulableEntity(entity);
+    }
+  }
+
+  public int getNumSchedulableEntities() {
+    // Return total number of schedulable entities, to maintain parity with
+    // existing FifoOrderingPolicy e.g. when determining if queue has reached
+    // its max app limit
+    int ret = 0;
+    for (Map.Entry<String, OrderingPolicy<S>> entry
+        : orderingPolicies.entrySet()) {
+      ret += entry.getValue().getNumSchedulableEntities();
+    }
+    return ret;
+  }
+
+  public void containerAllocated(S schedulableEntity, RMContainer r) {
+    getPartitionOrderingPolicy(schedulableEntity.getPartition())
+        .containerAllocated(schedulableEntity, r);
+  }
+
+  public void containerReleased(S schedulableEntity, RMContainer r) {
+    getPartitionOrderingPolicy(schedulableEntity.getPartition())
+        .containerReleased(schedulableEntity, r);
+  }
+
+  public void demandUpdated(S schedulableEntity) {
+    getPartitionOrderingPolicy(schedulableEntity.getPartition())
+        .demandUpdated(schedulableEntity);
+  }
+
+  @Override
+  public void configure(Map<String, String> conf) {
+    if (conf == null) {
+      return;
+    }
+    String partitions =
+        conf.get(YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX);
+    if (partitions != null) {
+      for (String partition : partitions.split(",")) {
+        partition = partition.trim();
+        if (!partition.isEmpty()) {
+          this.orderingPolicies.put(partition, new FifoOrderingPolicy());
+        }
+      }
+    }
+  }
+
+  @Override
+  public String getInfo() {
+    return "FifoOrderingPolicyWithExclusivePartitions";
+  }
+
+  private OrderingPolicy<S> getPartitionOrderingPolicy(String partition) {
+    String keyPartition = orderingPolicies.containsKey(partition) ?
+        partition : DEFAULT_PARTITION;
+    return orderingPolicies.get(keyPartition);
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java
similarity index 50%
copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java
index 41b83ce..0e9b55f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/IteratorSelector.java
@@ -18,41 +18,31 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
 
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-
-
 /**
- * A SchedulableEntity is a process to be scheduled.
- * for example, an application / application attempt
+ * IteratorSelector contains information needed to tell an
+ * {@link OrderingPolicy} what to return in an iterator.
  */
-public interface SchedulableEntity {
-  
-  /**
-   * Id - each entity must have a unique id
-   */
-  public String getId();
-  
-  /**
-   * Compare the passed SchedulableEntity to this one for input order.
-   * Input order is implementation defined and should reflect the 
-   * correct ordering for first-in first-out processing
-   */
-  public int compareInputOrderTo(SchedulableEntity other);
-  
-  /**
-   * View of Resources wanted and consumed by the entity
-   */
-  public ResourceUsage getSchedulingResourceUsage();
-  
+public class IteratorSelector {
+
+  public static final IteratorSelector EMPTY_ITERATOR_SELECTOR =
+      new IteratorSelector();
+
+  private String partition;
+
   /**
-   * Get the priority of the application
+   * The partition for this iterator selector.
+   * @return partition
    */
-  public Priority getPriority();
+  public String getPartition() {
+    return this.partition;
+  }
 
   /**
-   * Whether application was running before RM restart.
+   * Set partition for this iterator selector.
+   * @param p partition
    */
-  public boolean isRecovering();
+  public void setPartition(String p) {
+    this.partition = p;
+  }
 
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
index 9aacc7e..66b6a59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
@@ -45,10 +45,11 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
   /**
    * Return an iterator over the collection of {@link SchedulableEntity}
    * objects which orders them for container assignment.
+   * @param sel the {@link IteratorSelector} to filter with
    * @return an iterator over the collection of {@link SchedulableEntity}
    * objects
    */
-  public Iterator<S> getAssignmentIterator();
+  Iterator<S> getAssignmentIterator(IteratorSelector sel);
 
   /**
    * Return an iterator over the collection of {@link SchedulableEntity}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
index 41b83ce..be83556 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java
@@ -55,4 +55,9 @@ public interface SchedulableEntity {
    */
   public boolean isRecovering();
 
+  /**
+   * Get partition corresponding to this entity.
+   * @return partition
+   */
+  String getPartition();
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
index cdc67ed..57b195f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
@@ -32,6 +32,7 @@ import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -798,6 +799,147 @@ public class TestSchedulerUtils {
     System.err.println("Failed to wait scheduler application attempt stopped.");
   }
 
+  @Test
+  public void testEnforcePartitionExclusivity() {
+    String enforcedExclusiveLabel = "x";
+    Set<String> enforcedExclusiveLabelSet =
+        Collections.singleton(enforcedExclusiveLabel);
+    String dummyLabel = "y";
+    String appLabel = "appLabel";
+    ResourceRequest rr = BuilderUtils.newResourceRequest(
+        mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1);
+
+    // RR label unset and app label does not match. Nothing should happen.
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        null);
+    Assert.assertNull(rr.getNodeLabelExpression());
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        appLabel);
+    Assert.assertNull(rr.getNodeLabelExpression());
+
+    // RR label and app label do not match. Nothing should happen.
+    rr.setNodeLabelExpression(dummyLabel);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        null);
+    Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        appLabel);
+    Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
+
+    // RR label matches but app label does not. RR label should be set
+    // to app label
+    rr.setNodeLabelExpression(enforcedExclusiveLabel);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        null);
+    Assert.assertNull(rr.getNodeLabelExpression());
+    rr.setNodeLabelExpression(enforcedExclusiveLabel);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        appLabel);
+    Assert.assertEquals(appLabel, rr.getNodeLabelExpression());
+
+    // RR label unset and app label matches. RR label should be set
+    // to app label
+    rr.setNodeLabelExpression(null);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        enforcedExclusiveLabel);
+    Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
+
+    // RR label does not match and app label matches. RR label should be set
+    // to app label
+    rr.setNodeLabelExpression(dummyLabel);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        enforcedExclusiveLabel);
+    Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
+
+    // RR label and app label matches. Nothing should happen.
+    rr.setNodeLabelExpression(enforcedExclusiveLabel);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        enforcedExclusiveLabel);
+    Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
+
+    // Unconfigured label: nothing should happen.
+    rr.setNodeLabelExpression(null);
+    SchedulerUtils.enforcePartitionExclusivity(rr, null,
+        appLabel);
+    Assert.assertNull(rr.getNodeLabelExpression());
+    rr.setNodeLabelExpression(dummyLabel);
+    SchedulerUtils.enforcePartitionExclusivity(rr, null,
+        appLabel);
+    Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
+    rr.setNodeLabelExpression(enforcedExclusiveLabel);
+    SchedulerUtils.enforcePartitionExclusivity(rr, null,
+        appLabel);
+    Assert.assertEquals(enforcedExclusiveLabel, rr.getNodeLabelExpression());
+  }
+
+  @Test
+  public void testEnforcePartitionExclusivityMultipleLabels() {
+    String enforcedLabel1 = "x";
+    String enforcedLabel2 = "y";
+    Set<String> enforcedExclusiveLabelSet = new HashSet<>();
+    enforcedExclusiveLabelSet.add(enforcedLabel1);
+    enforcedExclusiveLabelSet.add(enforcedLabel2);
+    String dummyLabel = "dummyLabel";
+    String appLabel = "appLabel";
+    ResourceRequest rr = BuilderUtils.newResourceRequest(
+        mock(Priority.class), ResourceRequest.ANY, mock(Resource.class), 1);
+
+    // RR label unset and app label does not match. Nothing should happen.
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        null);
+    Assert.assertNull(rr.getNodeLabelExpression());
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        appLabel);
+    Assert.assertNull(rr.getNodeLabelExpression());
+
+    // RR label and app label do not match. Nothing should happen.
+    rr.setNodeLabelExpression(dummyLabel);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        null);
+    Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        appLabel);
+    Assert.assertEquals(dummyLabel, rr.getNodeLabelExpression());
+
+    // RR label matches but app label does not. RR label should be set
+    // to app label
+    rr.setNodeLabelExpression(enforcedLabel1);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        null);
+    Assert.assertNull(rr.getNodeLabelExpression());
+    rr.setNodeLabelExpression(enforcedLabel2);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        appLabel);
+    Assert.assertEquals(appLabel, rr.getNodeLabelExpression());
+
+    // RR label unset and app label matches. RR label should be set
+    // to app label
+    rr.setNodeLabelExpression(null);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        enforcedLabel1);
+    Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
+
+    // RR label does not match and app label matches. RR label should be set
+    // to app label
+    rr.setNodeLabelExpression(dummyLabel);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        enforcedLabel2);
+    Assert.assertEquals(enforcedLabel2, rr.getNodeLabelExpression());
+
+    // RR label and app label matches. Nothing should happen.
+    rr.setNodeLabelExpression(enforcedLabel1);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        enforcedLabel1);
+    Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
+
+    // RR label and app label don't match, but they're both enforced labels.
+    // RR label should be set to app label.
+    rr.setNodeLabelExpression(enforcedLabel2);
+    SchedulerUtils.enforcePartitionExclusivity(rr, enforcedExclusiveLabelSet,
+        enforcedLabel1);
+    Assert.assertEquals(enforcedLabel1, rr.getNodeLabelExpression());
+  }
+
   public static SchedulerApplication<SchedulerApplicationAttempt>
       verifyAppAddedAndRemovedFromScheduler(
           Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 0dd79c8..5922803 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -124,6 +124,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -1229,8 +1230,9 @@ public class TestCapacityScheduler {
     //This happens because app2 has no demand/a magnitude of NaN, which
     //results in app1 and app2 being equal in the fairness comparison and
     //failling back to fifo (start) ordering
-    assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
-      appId1.toString());
+    assertEquals(q.getOrderingPolicy().getAssignmentIterator(
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
+        appId1.toString());
 
     //Now, allocate for app2 (this would be the first/AM allocation)
     ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
@@ -1243,8 +1245,9 @@ public class TestCapacityScheduler {
     //verify re-ordering based on the allocation alone
 
     //Now, the first app for assignment is app2
-    assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
-      appId2.toString());
+    assertEquals(q.getOrderingPolicy().getAssignmentIterator(
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
+        appId2.toString());
 
     rm.stop();
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 3b76fb2..b2283fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
@@ -122,6 +123,8 @@ public class TestLeafQueue {
   final static int GB = 1024;
   final static String DEFAULT_RACK = "/default";
 
+  private final static String LABEL = "test";
+
   private final ResourceCalculator resourceCalculator =
       new DefaultResourceCalculator();
   
@@ -130,14 +133,19 @@ public class TestLeafQueue {
 
   @Before
   public void setUp() throws Exception {
-    setUpInternal(resourceCalculator);
+    setUpInternal(resourceCalculator, false);
   }
 
   private void setUpWithDominantResourceCalculator() throws Exception {
-    setUpInternal(dominantResourceCalculator);
+    setUpInternal(dominantResourceCalculator, false);
+  }
+
+  private void setUpWithNodeLabels() throws Exception {
+    setUpInternal(resourceCalculator, true);
   }
 
-  private void setUpInternal(ResourceCalculator rC) throws Exception {
+  private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
+      throws Exception {
     CapacityScheduler spyCs = new CapacityScheduler();
     queues = new HashMap<String, CSQueue>();
     cs = spy(spyCs);
@@ -162,7 +170,7 @@ public class TestLeafQueue {
     csConf.setBoolean(CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES,
         false);
     final String newRoot = "root" + System.currentTimeMillis();
-    setupQueueConfiguration(csConf, newRoot);
+    setupQueueConfiguration(csConf, newRoot, withNodeLabels);
     YarnConfiguration conf = new YarnConfiguration();
     cs.setConf(conf);
 
@@ -216,24 +224,39 @@ public class TestLeafQueue {
   private static final String E = "e";
   private void setupQueueConfiguration(
       CapacitySchedulerConfiguration conf, 
-      final String newRoot) {
+      final String newRoot, boolean withNodeLabels) {
     
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {newRoot});
     conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100);
     conf.setAcl(CapacitySchedulerConfiguration.ROOT,
       QueueACL.SUBMIT_APPLICATIONS, " ");
+    if (withNodeLabels) {
+      conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, LABEL, 100);
+      conf.setMaximumCapacityByLabel(CapacitySchedulerConfiguration.ROOT,
+          LABEL, 100);
+    }
     
     final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
     conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
     conf.setCapacity(Q_newRoot, 100);
     conf.setMaximumCapacity(Q_newRoot, 100);
     conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
+    if (withNodeLabels) {
+      conf.setAccessibleNodeLabels(Q_newRoot, Collections.singleton(LABEL));
+      conf.setCapacityByLabel(Q_newRoot, LABEL, 100);
+      conf.setMaximumCapacityByLabel(Q_newRoot, LABEL, 100);
+    }
 
     final String Q_A = Q_newRoot + "." + A;
     conf.setCapacity(Q_A, 8.5f);
     conf.setMaximumCapacity(Q_A, 20);
     conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
+    if (withNodeLabels) {
+      conf.setAccessibleNodeLabels(Q_A, Collections.singleton(LABEL));
+      conf.setCapacityByLabel(Q_A, LABEL, 100);
+      conf.setMaximumCapacityByLabel(Q_A, LABEL, 100);
+    }
     
     final String Q_B = Q_newRoot + "." + B;
     conf.setCapacity(Q_B, 80);
@@ -3229,6 +3252,116 @@ public class TestLeafQueue {
   }
 
   @Test
+  public void testFifoWithPartitionsAssignment() throws Exception {
+    setUpWithNodeLabels();
+
+    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
+    OrderingPolicy<FiCaSchedulerApp> policy =
+        new FifoOrderingPolicyWithExclusivePartitions<>();
+    policy.configure(Collections.singletonMap(
+        YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, LABEL));
+    a.setOrderingPolicy(policy);
+    String host00 = "127.0.0.1";
+    String rack0 = "rack_0";
+    FiCaSchedulerNode node00 = TestUtils.getMockNode(host00, rack0, 0,
+        16 * GB);
+    when(node00.getPartition()).thenReturn(LABEL);
+    String host01 = "127.0.0.2";
+    FiCaSchedulerNode node01 = TestUtils.getMockNode(host01, rack0, 0,
+        16 * GB);
+    when(node01.getPartition()).thenReturn("");
+
+    final int numNodes = 4;
+    Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
+        numNodes * 16);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    String user0 = "user_0";
+
+    final ApplicationAttemptId appAttemptId0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app0 = spy(new FiCaSchedulerApp(appAttemptId0, user0, a,
+        mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5),
+        false));
+    a.submitApplicationAttempt(app0, user0);
+
+    final ApplicationAttemptId appAttemptId1 =
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app1 = spy(new FiCaSchedulerApp(appAttemptId1, user0, a,
+        mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3),
+        false));
+    when(app1.getPartition()).thenReturn(LABEL);
+    a.submitApplicationAttempt(app1, user0);
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
+        app1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node00.getNodeID(),
+        node00, node01.getNodeID(), node01);
+
+    Priority priority = TestUtils.createMockPriority(1);
+    List<ResourceRequest> app0Requests = new ArrayList<>();
+    List<ResourceRequest> app1Requests = new ArrayList<>();
+
+    app0Requests.clear();
+    app0Requests.add(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority,
+            recordFactory));
+    app0.updateResourceRequests(app0Requests);
+
+    app1Requests.clear();
+    app1Requests.add(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+            recordFactory, LABEL));
+    app1.updateResourceRequests(app1Requests);
+
+    // app_1 will get containers since it is exclusive-enforced
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node00,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage()
+        .getUsed(LABEL).getMemorySize());
+    // app_0 should not get resources from node_0_0 since the labels
+    // don't match
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node00,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    Assert.assertEquals(0 * GB, app0.getCurrentConsumption().getMemorySize());
+
+    app1Requests.clear();
+    app1Requests.add(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+            recordFactory, LABEL));
+    app1.updateResourceRequests(app1Requests);
+
+    // When node_0_1 heartbeats, app_0 should get containers
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node01,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
+    Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage()
+        .getUsed(LABEL).getMemorySize());
+
+    app0Requests.clear();
+    app0Requests.add(TestUtils
+        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+            recordFactory));
+    app0.updateResourceRequests(app0Requests);
+
+    // When node_0_0 heartbeats, app_1 should get containers again
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node00,
+            new ResourceLimits(clusterResource),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
+    Assert.assertEquals(2 * GB, app1.getSchedulingResourceUsage()
+        .getUsed(LABEL).getMemorySize());
+  }
+
+  @Test
   public void testConcurrentAccess() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     MockRM rm = new MockRM();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java
index 4f251bf..62f7a49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java
@@ -18,21 +18,19 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
 
-import java.util.*;
-
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 
 
 public class MockSchedulableEntity implements SchedulableEntity {
-  
+
   private String id;
   private long serial = 0;
   private Priority priority;
   private boolean isRecovering;
+  private String partition = "";
 
   public MockSchedulableEntity() { }
   
@@ -101,4 +99,13 @@ public class MockSchedulableEntity implements SchedulableEntity {
   protected void setRecovering(boolean entityRecovering) {
     this.isRecovering = entityRecovering;
   }
+
+  @Override
+  public String getPartition() {
+    return partition;
+  }
+
+  public void setPartition(String partition) {
+    this.partition = partition;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
index 683173a..e023e01 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
@@ -126,19 +126,25 @@ public class TestFairOrderingPolicy {
 
 
     //Assignment, least to greatest consumption
-    checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
+    checkIds(schedOrder.getAssignmentIterator(
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR),
+        new String[]{"3", "2", "1"});
 
     //Preemption, greatest to least
     checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
 
     //Change value without inform, should see no change
     msp2.setUsed(Resources.createResource(6));
-    checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
+    checkIds(schedOrder.getAssignmentIterator(
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR),
+        new String[]{"3", "2", "1"});
     checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
 
     //Do inform, will reorder
     schedOrder.containerAllocated(msp2, null);
-    checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"});
+    checkIds(schedOrder.getAssignmentIterator(
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR),
+        new String[]{"3", "1", "2"});
     checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"});
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java
index 447618f4..d94d077 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicy.java
@@ -61,7 +61,7 @@ public class TestFifoOrderingPolicy {
     schedOrder.addSchedulableEntity(msp3);
     
     //Assignment, oldest to youngest
-    checkSerials(schedOrder.getAssignmentIterator(), new long[]{1, 2, 3});
+    checkSerials(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[]{1, 2, 3});
     
     //Preemption, youngest to oldest
     checkSerials(schedOrder.getPreemptionIterator(), new long[]{3, 2, 1});
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java
index befa8e6..3996b0d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyForPendingApps.java
@@ -72,8 +72,9 @@ public class TestFifoOrderingPolicyForPendingApps {
     schedOrder.addSchedulableEntity(msp7);
 
     // Assignment with serial id's are 3,2,4,1,6,5,7
-    checkSerials(schedOrder.getAssignmentIterator(), new long[] { 3, 2, 4, 1,
-        6, 5, 7 });
+    checkSerials(schedOrder.getAssignmentIterator(
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[] {3, 2, 4, 1,
+        6, 5, 7});
 
     //Preemption, youngest to oldest
     checkSerials(schedOrder.getPreemptionIterator(), new long[] { 7, 5, 6, 1,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java
new file mode 100644
index 0000000..499a70a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFifoOrderingPolicyWithExclusivePartitions.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * Tests {@link FifoOrderingPolicyWithExclusivePartitions} ordering policy.
+ */
+public class TestFifoOrderingPolicyWithExclusivePartitions {
+
+  private static final String PARTITION = "test";
+  private static final String PARTITION2 = "test2";
+
+  @Test
+  public void testNoConfiguredExclusiveEnforcedPartitions() {
+    FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy =
+        new FifoOrderingPolicyWithExclusivePartitions<>();
+    policy.configure(Collections.EMPTY_MAP);
+
+    MockSchedulableEntity p1 = new MockSchedulableEntity(4, 0, false);
+    p1.setPartition(PARTITION);
+    p1.setId("p1");
+    MockSchedulableEntity p2 = new MockSchedulableEntity(3, 1, false);
+    p2.setPartition(PARTITION);
+    p2.setId("p2");
+
+    MockSchedulableEntity r1 = new MockSchedulableEntity(2, 0, false);
+    r1.setId("r1");
+    MockSchedulableEntity r2 = new MockSchedulableEntity(1, 0, false);
+    r2.setId("r2");
+
+    policy.addSchedulableEntity(p1);
+    policy.addAllSchedulableEntities(Arrays.asList(p2, r1, r2));
+    Assert.assertEquals(4, policy.getNumSchedulableEntities());
+    Assert.assertEquals(4, policy.getSchedulableEntities().size());
+    IteratorSelector sel = new IteratorSelector();
+    // Should behave like FifoOrderingPolicy, regardless of partition
+    verifyAssignmentIteratorOrder(policy,
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR, "p2", "r2", "r1", "p1");
+    verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2");
+    sel.setPartition(PARTITION);
+    verifyAssignmentIteratorOrder(policy, sel, "p2", "r2", "r1", "p1");
+    verifyPreemptionIteratorOrder(policy, "p1", "r1", "r2", "p2");
+
+    policy.removeSchedulableEntity(p2);
+    policy.removeSchedulableEntity(r2);
+    Assert.assertEquals(2, policy.getNumSchedulableEntities());
+    Assert.assertEquals(2, policy.getSchedulableEntities().size());
+    verifyAssignmentIteratorOrder(policy,
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1", "p1");
+    verifyPreemptionIteratorOrder(policy, "p1", "r1");
+    sel.setPartition(PARTITION);
+    verifyAssignmentIteratorOrder(policy, sel, "r1", "p1");
+    verifyPreemptionIteratorOrder(policy, "p1", "r1");
+  }
+
+  @Test
+  public void testSingleExclusiveEnforcedPartition() {
+    FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy =
+        new FifoOrderingPolicyWithExclusivePartitions<>();
+    policy.configure(Collections.singletonMap(
+        YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, PARTITION));
+
+    // PARTITION iterator should return p2, p1, p3
+    MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false);
+    p1.setPartition(PARTITION);
+    p1.setId("p1");
+    MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false);
+    p2.setPartition(PARTITION);
+    p2.setId("p2");
+    MockSchedulableEntity p3 = new MockSchedulableEntity(3, 0, false);
+    p3.setPartition(PARTITION);
+    p3.setId("p3");
+
+    // non-PARTITION iterator should return r3, r2, r1
+    MockSchedulableEntity r1 = new MockSchedulableEntity(6, 0, false);
+    r1.setId("r1");
+    MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false);
+    r2.setId("r2");
+    MockSchedulableEntity r3 = new MockSchedulableEntity(2, 1, false);
+    r3.setId("r3");
+
+    policy.addSchedulableEntity(r1);
+    Assert.assertEquals(1, policy.getNumSchedulableEntities());
+    Assert.assertEquals("r1", policy.getSchedulableEntities()
+        .iterator().next().getId());
+    verifyAssignmentIteratorOrder(policy,
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r1");
+    verifyPreemptionIteratorOrder(policy, "r1");
+
+    List<MockSchedulableEntity> entities = Arrays.asList(r2, r3, p1, p2);
+    policy.addAllSchedulableEntities(entities);
+    policy.addSchedulableEntity(p3);
+    Assert.assertEquals(6, policy.getNumSchedulableEntities());
+    Assert.assertEquals(6, policy.getSchedulableEntities().size());
+    // Assignment iterator should return non-PARTITION entities,
+    // in order based on FifoOrderingPolicy
+    verifyAssignmentIteratorOrder(policy,
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r2", "r1");
+    // Preemption iterator should return all entities, in global order
+    verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
+    // Same thing as above, but with a non-empty partition
+    IteratorSelector sel = new IteratorSelector();
+    sel.setPartition("dummy");
+    verifyAssignmentIteratorOrder(policy, sel, "r3", "r2", "r1");
+    verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
+    // Should return PARTITION entities, in order based on FifoOrderingPolicy
+    sel.setPartition(PARTITION);
+    verifyAssignmentIteratorOrder(policy, sel, "p2", "p1", "p3");
+    verifyPreemptionIteratorOrder(policy, "r1", "r2", "p3", "p1", "p2", "r3");
+
+    policy.removeSchedulableEntity(p2);
+    policy.removeSchedulableEntity(r2);
+    Assert.assertEquals(4, policy.getNumSchedulableEntities());
+    Assert.assertEquals(4, policy.getSchedulableEntities().size());
+    verifyAssignmentIteratorOrder(policy,
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1");
+    verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3");
+    sel.setPartition(PARTITION);
+    verifyAssignmentIteratorOrder(policy, sel, "p1", "p3");
+    verifyPreemptionIteratorOrder(policy, "r1", "p3", "p1", "r3");
+
+    policy.removeSchedulableEntity(p1);
+    policy.removeSchedulableEntity(p3);
+    Assert.assertEquals(2, policy.getNumSchedulableEntities());
+    Assert.assertEquals(2, policy.getSchedulableEntities().size());
+    verifyAssignmentIteratorOrder(policy,
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR, "r3", "r1");
+    verifyPreemptionIteratorOrder(policy, "r1", "r3");
+    sel.setPartition(PARTITION);
+    verifyAssignmentIteratorOrder(policy, sel);
+    verifyPreemptionIteratorOrder(policy, "r1", "r3");
+  }
+
+  @Test
+  public void testMultipleExclusiveEnforcedPartitions() {
+    FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy =
+        new FifoOrderingPolicyWithExclusivePartitions<>();
+    policy.configure(Collections.singletonMap(
+        YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX,
+        PARTITION + "," + PARTITION2));
+
+    // PARTITION iterator should return p2, p1
+    MockSchedulableEntity p1 = new MockSchedulableEntity(1, 0, false);
+    p1.setPartition(PARTITION);
+    p1.setId("p1");
+    MockSchedulableEntity p2 = new MockSchedulableEntity(5, 1, false);
+    p2.setPartition(PARTITION);
+    p2.setId("p2");
+
+    // PARTITION2 iterator should return r1, r2
+    MockSchedulableEntity r1 = new MockSchedulableEntity(3, 0, false);
+    r1.setPartition(PARTITION2);
+    r1.setId("r1");
+    MockSchedulableEntity r2 = new MockSchedulableEntity(4, 0, false);
+    r2.setPartition(PARTITION2);
+    r2.setId("r2");
+
+    // default iterator should return s2, s1
+    MockSchedulableEntity s1 = new MockSchedulableEntity(6, 0, false);
+    s1.setId("s1");
+    MockSchedulableEntity s2 = new MockSchedulableEntity(2, 0, false);
+    s2.setId("s2");
+
+    policy.addAllSchedulableEntities(Arrays.asList(s1, s2, r1));
+    Assert.assertEquals(3, policy.getNumSchedulableEntities());
+    Assert.assertEquals(3, policy.getSchedulableEntities().size());
+    IteratorSelector sel = new IteratorSelector();
+    // assignment iterator returns only default (non-partitioned) entities
+    verifyAssignmentIteratorOrder(policy,
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
+    verifyPreemptionIteratorOrder(policy, "s1", "r1", "s2");
+    sel.setPartition(PARTITION2);
+    verifyAssignmentIteratorOrder(policy, sel, "r1");
+
+    policy.addAllSchedulableEntities(Arrays.asList(r2, p1, p2));
+    Assert.assertEquals(6, policy.getNumSchedulableEntities());
+    Assert.assertEquals(6, policy.getSchedulableEntities().size());
+    verifyAssignmentIteratorOrder(policy,
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
+    sel.setPartition(PARTITION);
+    verifyAssignmentIteratorOrder(policy, sel, "p2", "p1");
+    sel.setPartition(PARTITION2);
+    verifyAssignmentIteratorOrder(policy, sel, "r1", "r2");
+    verifyPreemptionIteratorOrder(policy, "s1", "r2", "r1", "s2", "p1", "p2");
+
+    policy.removeSchedulableEntity(p2);
+    policy.removeSchedulableEntity(r1);
+    policy.removeSchedulableEntity(r2);
+    Assert.assertEquals(3, policy.getNumSchedulableEntities());
+    Assert.assertEquals(3, policy.getSchedulableEntities().size());
+    verifyAssignmentIteratorOrder(policy,
+        IteratorSelector.EMPTY_ITERATOR_SELECTOR, "s2", "s1");
+    sel.setPartition(PARTITION);
+    verifyAssignmentIteratorOrder(policy, sel, "p1");
+    sel.setPartition(PARTITION2);
+    verifyAssignmentIteratorOrder(policy, sel);
+    verifyPreemptionIteratorOrder(policy, "s1", "s2", "p1");
+  }
+
+  private void verifyAssignmentIteratorOrder(
+      FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy,
+      IteratorSelector sel, String... ids) {
+    verifyIteratorOrder(policy.getAssignmentIterator(sel), ids);
+  }
+
+  private void verifyPreemptionIteratorOrder(
+      FifoOrderingPolicyWithExclusivePartitions<MockSchedulableEntity> policy,
+      String... ids) {
+    verifyIteratorOrder(policy.getPreemptionIterator(), ids);
+  }
+
+  private void verifyIteratorOrder(Iterator<MockSchedulableEntity> itr,
+      String... ids) {
+    for (String id : ids) {
+      Assert.assertEquals(id, itr.next().getId());
+    }
+    Assert.assertFalse(itr.hasNext());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org