You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/01/20 01:55:41 UTC

hadoop git commit: YARN-2933. Capacity Scheduler preemption policy should only consider capacity without labels temporarily. Contributed by Mayank Bansal

Repository: hadoop
Updated Branches:
  refs/heads/trunk cb0a15d20 -> 0a2d3e717


YARN-2933. Capacity Scheduler preemption policy should only consider capacity without labels temporarily. Contributed by Mayank Bansal


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a2d3e71
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a2d3e71
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a2d3e71

Branch: refs/heads/trunk
Commit: 0a2d3e717d9c42090a32ff177991a222a1e34132
Parents: cb0a15d
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Jan 19 16:48:50 2015 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Mon Jan 19 16:48:50 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../ProportionalCapacityPreemptionPolicy.java   |  53 +++++++++-
 ...estProportionalCapacityPreemptionPolicy.java | 106 +++++++++++++++++--
 3 files changed, 149 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a2d3e71/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5670963..a29d316 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -391,6 +391,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3015. yarn classpath command should support same options as hadoop
     classpath. (Contributed by Varun Saxena)
 
+    YARN-2933. Capacity Scheduler preemption policy should only consider capacity 
+    without labels temporarily. (Mayank Bansal via wangda)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a2d3e71/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 1a3f804..0743f60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -30,15 +30,19 @@ import java.util.NavigableSet;
 import java.util.PriorityQueue;
 import java.util.Set;
 
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
@@ -129,6 +133,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   private float percentageClusterPreemptionAllowed;
   private double naturalTerminationFactor;
   private boolean observeOnly;
+  private Map<NodeId, Set<String>> labels;
 
   public ProportionalCapacityPreemptionPolicy() {
     clock = new SystemClock();
@@ -168,6 +173,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
     observeOnly = config.getBoolean(OBSERVE_ONLY, false);
     rc = scheduler.getResourceCalculator();
+    labels = null;
   }
   
   @VisibleForTesting
@@ -176,14 +182,39 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   }
 
   @Override
-  public void editSchedule(){
+  public void editSchedule() {
     CSQueue root = scheduler.getRootQueue();
-    Resource clusterResources =
-      Resources.clone(scheduler.getClusterResource());
+    Resource clusterResources = Resources.clone(scheduler.getClusterResource());
+    clusterResources = getNonLabeledResources(clusterResources);
+    setNodeLabels(scheduler.getRMContext().getNodeLabelManager()
+        .getNodeLabels());
     containerBasedPreemptOrKill(root, clusterResources);
   }
 
   /**
+   * Setting Node Labels
+   * 
+   * @param nodelabels
+   */
+  public void setNodeLabels(Map<NodeId, Set<String>> nodelabels) {
+    labels = nodelabels;
+  }
+
+  /**
+   * This method returns all non labeled resources.
+   * 
+   * @param clusterResources
+   * @return Resources
+   */
+  private Resource getNonLabeledResources(Resource clusterResources) {
+    RMContext rmcontext = scheduler.getRMContext();
+    RMNodeLabelsManager lm = rmcontext.getNodeLabelManager();
+    Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
+        clusterResources);
+    return res == null ? clusterResources : res;
+  }
+  
+  /**
    * This method selects and tracks containers to be preempted. If a container
    * is in the target list for more than maxWaitTime it is killed.
    *
@@ -593,7 +624,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param app
    * @param clusterResource
    * @param rsrcPreempt
-   * @return
+   * @return Set<RMContainer> Set of RMContainers
    */
   private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
       Resource clusterResource, Resource rsrcPreempt,
@@ -635,12 +666,26 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         Resources.addTo(skippedAMSize, c.getContainer().getResource());
         continue;
       }
+      // skip Labeled resource
+      if(isLabeledContainer(c)){
+        continue;
+      }
       ret.add(c);
       Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
     }
 
     return ret;
   }
+  
+  /**
+   * Checking if given container is a labeled container
+   * 
+   * @param c
+   * @return true/false
+   */
+  private boolean isLabeledContainer(RMContainer c) {
+    return labels.containsKey(c.getAllocatedNode());
+  }
 
   /**
    * Compare by reversed priority order first, and then reversed containerId

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a2d3e71/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index ca67ef0..0a147f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -38,27 +38,37 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
@@ -72,12 +82,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
+import org.mortbay.log.Log;
 
 public class TestProportionalCapacityPreemptionPolicy {
 
@@ -85,14 +97,18 @@ public class TestProportionalCapacityPreemptionPolicy {
 
   int appAlloc = 0;
   boolean setAMContainer = false;
+  boolean setLabeledContainer = false;
   float setAMResourcePercent = 0.0f;
   Random rand = null;
   Clock mClock = null;
   Configuration conf = null;
   CapacityScheduler mCS = null;
+  RMContext rmContext = null;
+  RMNodeLabelsManager lm = null;
   CapacitySchedulerConfiguration schedConf = null;
   EventHandler<ContainerPreemptEvent> mDisp = null;
   ResourceCalculator rc = new DefaultResourceCalculator();
+  Resource clusterResources = null;
   final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
       ApplicationId.newInstance(TS, 0), 0);
   final ApplicationAttemptId appB = ApplicationAttemptId.newInstance(
@@ -108,6 +124,19 @@ public class TestProportionalCapacityPreemptionPolicy {
   final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
     ArgumentCaptor.forClass(ContainerPreemptEvent.class);
 
+  public enum priority {
+    AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2);
+    int value;
+
+    private priority(int value) {
+      this.value = value;
+    }
+
+    public int getValue() {
+      return this.value;
+    }
+  };  
+
   @Rule public TestName name = new TestName();
 
   @Before
@@ -130,8 +159,12 @@ public class TestProportionalCapacityPreemptionPolicy {
     mClock = mock(Clock.class);
     mCS = mock(CapacityScheduler.class);
     when(mCS.getResourceCalculator()).thenReturn(rc);
+    lm = mock(RMNodeLabelsManager.class);
     schedConf = new CapacitySchedulerConfiguration();
     when(mCS.getConfiguration()).thenReturn(schedConf);
+    rmContext = mock(RMContext.class);
+    when(mCS.getRMContext()).thenReturn(rmContext);
+    when(rmContext.getNodeLabelManager()).thenReturn(lm);
     mDisp = mock(EventHandler.class);
     rand = new Random();
     long seed = rand.nextLong();
@@ -746,7 +779,51 @@ public class TestProportionalCapacityPreemptionPolicy {
     verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
     setAMContainer = false;
   }
-  
+
+  @Test
+  public void testIdealAllocationForLabels() {
+    int[][] qData = new int[][] {
+    // / A B
+        { 80, 40, 40 }, // abs
+        { 80, 80, 80 }, // maxcap
+        { 80, 80, 0 }, // used
+        { 70, 20, 50 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 1, 1 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    setLabeledContainer = true;
+    Map<NodeId, Set<String>> labels = new HashMap<NodeId, Set<String>>();
+    NodeId node = NodeId.newInstance("node1", 0);
+    Set<String> labelSet = new HashSet<String>();
+    labelSet.add("x");
+    labels.put(node, labelSet);
+    when(lm.getNodeLabels()).thenReturn(labels);
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    // Subtracting Label X resources from cluster resources
+    when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
+        Resources.clone(Resource.newInstance(80, 0)));
+    clusterResources.setMemory(100);
+    policy.editSchedule();
+
+    // By skipping AM Container and Labeled container, all other 18 containers
+    // of appD will be
+    // preempted
+    verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+    // By skipping AM Container and Labeled container, all other 18 containers
+    // of appC will be
+    // preempted
+    verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+    // rest 4 containers from appB will be preempted
+    verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
+    setAMContainer = false;
+    setLabeledContainer = false;
+  }
+
   @Test
   public void testPreemptSkippedAMContainers() {
     int[][] qData = new int[][] {
@@ -846,7 +923,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     ParentQueue mRoot = buildMockRootQueue(rand, qData);
     when(mCS.getRootQueue()).thenReturn(mRoot);
 
-    Resource clusterResources =
+    clusterResources =
       Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
     when(mCS.getClusterResource()).thenReturn(clusterResources);
     return policy;
@@ -965,7 +1042,8 @@ public class TestProportionalCapacityPreemptionPolicy {
     Resource unit = Resource.newInstance(gran, 0);
     List<RMContainer> cReserved = new ArrayList<RMContainer>();
     for (int i = 0; i < reserved; i += gran) {
-      cReserved.add(mockContainer(appAttId, cAlloc, unit, 1));
+      cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
+          .getValue()));
       ++cAlloc;
     }
     when(app.getReservedContainers()).thenReturn(cReserved);
@@ -973,9 +1051,16 @@ public class TestProportionalCapacityPreemptionPolicy {
     List<RMContainer> cLive = new ArrayList<RMContainer>();
     for (int i = 0; i < used; i += gran) {
       if(setAMContainer && i == 0){
-        cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
-      }else{
-        cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+        cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER
+            .getValue()));
+      }else if(setLabeledContainer && i ==1){
+        cLive.add(mockContainer(appAttId, cAlloc, unit,
+            priority.LABELEDCONTAINER.getValue()));
+        ++used;
+      }
+      else{
+        cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
+            .getValue()));
       }
       ++cAlloc;
     }
@@ -984,18 +1069,21 @@ public class TestProportionalCapacityPreemptionPolicy {
   }
 
   RMContainer mockContainer(ApplicationAttemptId appAttId, int id,
-      Resource r, int priority) {
+      Resource r, int cpriority) {
     ContainerId cId = ContainerId.newContainerId(appAttId, id);
     Container c = mock(Container.class);
     when(c.getResource()).thenReturn(r);
-    when(c.getPriority()).thenReturn(Priority.create(priority));
+    when(c.getPriority()).thenReturn(Priority.create(cpriority));
     RMContainer mC = mock(RMContainer.class);
     when(mC.getContainerId()).thenReturn(cId);
     when(mC.getContainer()).thenReturn(c);
     when(mC.getApplicationAttemptId()).thenReturn(appAttId);
-    if(0 == priority){
+    if (priority.AMCONTAINER.getValue() == cpriority) {
       when(mC.isAMContainer()).thenReturn(true);
     }
+    if (priority.LABELEDCONTAINER.getValue() == cpriority) {
+      when(mC.getAllocatedNode()).thenReturn(NodeId.newInstance("node1", 0));
+    }
     return mC;
   }