You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/05/28 11:03:17 UTC

hadoop git commit: YARN-4781. Support intra-queue preemption for fairness ordering policy. Contributed by Eric Payne.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 61df174e8 -> 7c343669b


YARN-4781. Support intra-queue preemption for fairness ordering policy. Contributed by Eric Payne.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7c343669
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7c343669
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7c343669

Branch: refs/heads/trunk
Commit: 7c343669baf660df3b70d58987d6e68aec54d6fa
Parents: 61df174
Author: Sunil G <su...@apache.org>
Authored: Mon May 28 16:32:53 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Mon May 28 16:32:53 2018 +0530

----------------------------------------------------------------------
 .../FifoIntraQueuePreemptionPlugin.java         |  37 ++-
 .../capacity/IntraQueueCandidatesSelector.java  |  40 +++
 .../monitor/capacity/TempAppPerPartition.java   |   9 +
 .../AbstractComparatorOrderingPolicy.java       |   2 -
 ...alCapacityPreemptionPolicyMockFramework.java |  12 +-
 ...yPreemptionPolicyIntraQueueFairOrdering.java | 276 +++++++++++++++++++
 6 files changed, 366 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c343669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index 40f333f..12c178c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -34,6 +34,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAFairOrderingComparator;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -263,8 +266,17 @@ public class FifoIntraQueuePreemptionPlugin
       Resource queueReassignableResource,
       PriorityQueue<TempAppPerPartition> orderedByPriority) {
 
-    Comparator<TempAppPerPartition> reverseComp = Collections
-        .reverseOrder(new TAPriorityComparator());
+    Comparator<TempAppPerPartition> reverseComp;
+    OrderingPolicy<FiCaSchedulerApp> queueOrderingPolicy =
+        tq.leafQueue.getOrderingPolicy();
+    if (queueOrderingPolicy instanceof FairOrderingPolicy
+        && (context.getIntraQueuePreemptionOrderPolicy()
+            == IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
+      reverseComp = Collections.reverseOrder(
+          new TAFairOrderingComparator(this.rc, clusterResource));
+    } else {
+      reverseComp = Collections.reverseOrder(new TAPriorityComparator());
+    }
     TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
 
     String partition = tq.partition;
@@ -355,7 +367,16 @@ public class FifoIntraQueuePreemptionPlugin
       TempQueuePerPartition tq, Collection<FiCaSchedulerApp> apps,
       Resource clusterResource,
       Map<String, Resource> perUserAMUsed) {
-    TAPriorityComparator taComparator = new TAPriorityComparator();
+    Comparator<TempAppPerPartition> taComparator;
+    OrderingPolicy<FiCaSchedulerApp> orderingPolicy =
+        tq.leafQueue.getOrderingPolicy();
+    if (orderingPolicy instanceof FairOrderingPolicy
+        && (context.getIntraQueuePreemptionOrderPolicy()
+            == IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
+      taComparator = new TAFairOrderingComparator(this.rc, clusterResource);
+    } else {
+       taComparator = new TAPriorityComparator();
+    }
     PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
         100, taComparator);
 
@@ -393,13 +414,12 @@ public class FifoIntraQueuePreemptionPlugin
       // Set ideal allocation of app as 0.
       tmpApp.idealAssigned = Resources.createResource(0, 0);
 
-      orderedByPriority.add(tmpApp);
-
       // Create a TempUserPerPartition structure to hold more information
       // regarding each user's entities such as UserLimit etc. This could
       // be kept in a user to TempUserPerPartition map for further reference.
       String userName = app.getUser();
-      if (!usersPerPartition.containsKey(userName)) {
+      TempUserPerPartition tmpUser = usersPerPartition.get(userName);
+      if (tmpUser == null) {
         ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
             .getResourceUsage();
 
@@ -409,7 +429,7 @@ public class FifoIntraQueuePreemptionPlugin
         amUsed = (userSpecificAmUsed == null)
             ? Resources.none() : userSpecificAmUsed;
 
-        TempUserPerPartition tmpUser = new TempUserPerPartition(
+        tmpUser = new TempUserPerPartition(
             tq.leafQueue.getUser(userName), tq.queueName,
             Resources.clone(userResourceUsage.getUsed(partition)),
             Resources.clone(amUsed),
@@ -432,7 +452,10 @@ public class FifoIntraQueuePreemptionPlugin
         tmpUser.idealAssigned = Resources.createResource(0, 0);
         tq.addUserPerPartition(userName, tmpUser);
       }
+      tmpApp.setTempUserPerPartition(tmpUser);
+      orderedByPriority.add(tmpApp);
     }
+
     return orderedByPriority;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c343669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
index a91fac7..8ab9507 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.io.Serializable;
@@ -64,6 +66,44 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
     }
   }
 
+  /*
+   * Order first by amount used from least to most. Then order from oldest to
+   * youngest if amount used is the same.
+   */
+  static class TAFairOrderingComparator
+      implements Comparator<TempAppPerPartition> {
+
+    private ResourceCalculator rc;
+    private Resource clusterRes;
+
+    TAFairOrderingComparator(ResourceCalculator rc, Resource clusterRes) {
+      this.rc = rc;
+      this.clusterRes = clusterRes;
+    }
+
+    @Override
+    public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) {
+      if (ta1.getUser().equals(ta2.getUser())) {
+        AbstractComparatorOrderingPolicy<FiCaSchedulerApp> acop =
+            (AbstractComparatorOrderingPolicy<FiCaSchedulerApp>)
+            ta1.getFiCaSchedulerApp().getCSLeafQueue().getOrderingPolicy();
+        return acop.getComparator()
+                  .compare(ta1.getFiCaSchedulerApp(), ta2.getFiCaSchedulerApp());
+      } else {
+        Resource usedByUser1 = ta1.getTempUserPerPartition().getUsedDeductAM();
+        Resource usedByUser2 = ta2.getTempUserPerPartition().getUsedDeductAM();
+        if (Resources.equals(usedByUser1, usedByUser2)) {
+          return ta1.getApplicationId().compareTo(ta2.getApplicationId());
+        }
+        if (Resources.lessThan(rc, clusterRes, usedByUser1, usedByUser2)) {
+          return -1;
+        } else {
+          return 1;
+        }
+      }
+    }
+  }
+
   IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null;
   final CapacitySchedulerPreemptionContext context;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c343669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
index e9a934b..05d8096 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
@@ -34,6 +34,7 @@ public class TempAppPerPartition extends AbstractPreemptionEntity {
   // Following fields are settled and used by candidate selection policies
   private final int priority;
   private final ApplicationId applicationId;
+  private TempUserPerPartition tempUser;
 
   FiCaSchedulerApp app;
 
@@ -102,4 +103,12 @@ public class TempAppPerPartition extends AbstractPreemptionEntity {
       Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
     }
   }
+
+  public void setTempUserPerPartition(TempUserPerPartition tu) {
+    tempUser = tu;
+  }
+
+  public TempUserPerPartition getTempUserPerPartition() {
+    return tempUser;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c343669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
index b7cb1bf..09dd3bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
@@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
-import com.google.common.annotations.VisibleForTesting;
 
 
 /**
@@ -89,7 +88,6 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
     }
   }
 
-  @VisibleForTesting
   public Comparator<SchedulableEntity> getComparator() {
     return comparator; 
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c343669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index a972584..64b56fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -64,6 +65,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -337,9 +339,11 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
           .thenReturn(pendingForDefaultPartition);
 
       // need to set pending resource in resource usage as well
-      ResourceUsage ru = new ResourceUsage();
+      ResourceUsage ru = Mockito.spy(new ResourceUsage());
       ru.setUsed(label, used);
+      when(ru.getCachedUsed(anyString())).thenReturn(used);
       when(app.getAppAttemptResourceUsage()).thenReturn(ru);
+      when(app.getSchedulingResourceUsage()).thenReturn(ru);
 
       start = end + 1;
     }
@@ -637,6 +641,12 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         when(leafQueue.getApplications()).thenReturn(apps);
         when(leafQueue.getAllApplications()).thenReturn(apps);
         OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
+        String opName = conf.get(CapacitySchedulerConfiguration.PREFIX
+            + CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q)
+            + ".ordering-policy", "fifo");
+        if (opName.equals("fair")) {
+          so = Mockito.spy(new FairOrderingPolicy<FiCaSchedulerApp>());
+        }
         when(so.getPreemptionIterator()).thenAnswer(new Answer() {
           public Object answer(InvocationOnMock invocation) {
             return apps.descendingIterator();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c343669/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java
new file mode 100644
index 0000000..1678651
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+
+/*
+ * Test class for testing intra-queue preemption when the fair ordering policy
+ * is enabled on a capacity queue.
+ */
+public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering
+    extends ProportionalCapacityPreemptionPolicyMockFramework {
+  @Before
+  public void setup() {
+    super.setup();
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
+    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+  }
+
+  /*
+   * When the capacity scheduler fair ordering policy is enabled, preempt first
+   * from the application owned by the user that is the farthest over their
+   * user limit.
+   */
+  @Test
+  public void testIntraQueuePreemptionFairOrderingPolicyEnabledOneAppPerUser()
+      throws IOException {
+    // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a
+    conf.set(CapacitySchedulerConfiguration.PREFIX
+        + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair");
+    // Make sure all containers will be preempted in a single round.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 1.0);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 1 0]);" + // root
+            "-a(=[100 100 100 1 0])"; // a
+
+    // user1/app1 has 60 resources in queue a
+    // user2/app2 has 40 resources in queue a
+    // user3/app3 is requesting 20 resources in queue a
+    // With 3 users, preemptable user limit should be around 35 resources each.
+    // With FairOrderingPolicy enabled on queue a, all 20 resources should be
+    // preempted from app1
+    String appsConfig =
+    // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user)
+        "a\t" // app1, user1 in a
+            + "(1,1,n1,,60,false,0,user1);" +
+            "a\t" // app2, user2 in a
+            + "(1,1,n1,,40,false,0,user2);" +
+            "a\t" // app3, user3 in a
+            + "(1,1,n1,,0,false,20,user3)"
+            ;
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(20)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+  }
+
+  /*
+   * When the capacity scheduler fifo ordering policy is enabled, preempt first
+   * from the youngest application until reduced to user limit, then preempt
+   * from next youngest app.
+   */
+  @Test
+  public void testIntraQueuePreemptionFifoOrderingPolicyEnabled()
+      throws IOException {
+    // Enable FifoOrderingPolicy for yarn.scheduler.capacity.root.a
+    conf.set(CapacitySchedulerConfiguration.PREFIX
+        + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo");
+    // Make sure all containers will be preempted in a single round.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 1.0);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 1 0]);" + // root
+            "-a(=[100 100 100 1 0])"; // a
+
+    // user1/app1 has 60 resources in queue a
+    // user2/app2 has 40 resources in queue a
+    // user3/app3 is requesting 20 resources in queue a
+    // With 3 users, preemptable user limit should be around 35 resources each.
+    // With FifoOrderingPolicy enabled on queue a, the first 5 should come from
+    // the youngest app, app2, until app2 is reduced to the user limit of 35.
+    String appsConfig =
+    // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user)
+        "a\t" // app1, user1 in a
+            + "(1,1,n1,,60,false,0,user1);" +
+            "a\t" // app2, user2 in a
+            + "(1,1,n1,,40,false,0,user2);" +
+            "a\t" // app3, user3 in a
+            + "(1,1,n1,,0,false,5,user3)"
+            ;
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(5)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+
+    // user1/app1 has 60 resources in queue a
+    // user2/app2 has 35 resources in queue a
+    // user3/app3 has 5 resources and is requesting 15 resources in queue a
+    // With 3 users, preemptable user limit should be around 35 resources each.
+    // The next 15 should come from app1 even though app2 is younger since app2
+    // has already been reduced to its user limit.
+    appsConfig =
+    // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user)
+        "a\t" // app1, user1 in a
+            + "(1,1,n1,,60,false,0,user1);" +
+            "a\t" // app2, user2 in a
+            + "(1,1,n1,,35,false,0,user2);" +
+            "a\t" // app3, user3 in a
+            + "(1,1,n1,,5,false,15,user3)"
+            ;
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+  }
+
+  /*
+   * When the capacity scheduler fair ordering policy is enabled, preempt first
+   * from the youngest application from the user that is the farthest over their
+   * user limit.
+   */
+  @Test
+  public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser()
+      throws IOException {
+    // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a
+    conf.set(CapacitySchedulerConfiguration.PREFIX
+        + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair");
+    // Make sure all containers will be preempted in a single round.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 1.0);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 1 0]);" + // root
+            "-a(=[100 100 100 1 0])"; // a
+
+    // user1/app1 has 35 resources in queue a
+    // user1/app2 has 25 resources in queue a
+    // user2/app3 has 40 resources in queue a
+    // user3/app4 is requesting 20 resources in queue a
+    // With 3 users, preemptable user limit should be around 35 resources each.
+    // With FairOrderingPolicy enabled on queue a, all 20 resources should be
+    // preempted from app1 since it's the most over served app from the most
+    // over served user
+    String appsConfig =
+    // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user)
+        "a\t" // app1 and app2, user1 in a
+            + "(1,1,n1,,35,false,0,user1);" +
+            "a\t"
+            + "(1,1,n1,,25,false,0,user1);" +
+            "a\t" // app3, user2 in a
+            + "(1,1,n1,,40,false,0,user2);" +
+            "a\t" // app4, user3 in a
+            + "(1,1,n1,,0,false,20,user3)"
+            ;
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(20)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+  }
+
+  /*
+   * When the capacity scheduler fifo ordering policy is enabled and a user has
+   * multiple apps, preempt first from the youngest application.
+   */
+  @Test
+  public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser()
+      throws IOException {
+    // Enable FifoOrderingPolicy for yarn.scheduler.capacity.root.a
+    conf.set(CapacitySchedulerConfiguration.PREFIX
+        + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo");
+    // Make sure all containers will be preempted in a single round.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 1.0);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 1 0]);" + // root
+            "-a(=[100 100 100 1 0])"; // a
+
+    // user1/app1 has 40 resources in queue a
+    // user1/app2 has 20 resources in queue a
+    // user3/app3 has 40 resources in queue a
+    // user4/app4 is requesting 20 resources in queue a
+    // With 3 users, preemptable user limit should be around 35 resources each.
+    String appsConfig =
+    // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user)
+        "a\t" // app1, user1 in a
+            + "(1,1,n1,,40,false,0,user1);" +
+        "a\t" // app2, user1 in a
+            + "(1,1,n1,,20,false,0,user1);" +
+        "a\t" // app3, user3 in a
+            + "(1,1,n1,,40,false,0,user3);" +
+        "a\t" // app4, user4 in a
+            + "(1,1,n1,,0,false,25,user4)"
+            ;
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // app3 is the younges and also over its user limit. 5 should be preempted
+    // from app3 until it comes down to user3's user limit.
+    verify(mDisp, times(5)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+
+    // User1's app2 is its youngest. 19 should be preempted from app2, leaving
+    // only the AM
+    verify(mDisp, times(19)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+
+    // Preempt the remaining resource from User1's oldest app1.
+    verify(mDisp, times(1)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org