You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2016/01/11 07:38:59 UTC

hadoop git commit: YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 b91715bc8 -> 58a6142c1


YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58a6142c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58a6142c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58a6142c

Branch: refs/heads/branch-2.6
Commit: 58a6142c1455cb622ffc7d520df30d8f6dc5680b
Parents: b91715b
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Mon Jan 11 12:02:38 2016 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Mon Jan 11 12:02:38 2016 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../ProportionalCapacityPreemptionPolicy.java   |   3 +-
 ...estProportionalCapacityPreemptionPolicy.java | 198 +++++++++++++++----
 3 files changed, 165 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/58a6142c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e46f2f7..8f35efc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -43,6 +43,8 @@ Release 2.6.4 - UNRELEASED
     YARN-4180. AMLauncher does not retry on failures when talking to NM.
     (adhoot)
 
+    YARN-3849. Too much of preemption activity causing continuos killing of
+    containers across queues. (Sunil G via wangda)
 
 Release 2.6.3 - 2015-12-17
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58a6142c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 0f48b0c..854d2c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -632,11 +632,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     TempQueue ret;
     synchronized (root) {
       String queueName = root.getQueueName();
-      float absUsed = root.getAbsoluteUsedCapacity();
       float absCap = root.getAbsoluteCapacity();
       float absMaxCap = root.getAbsoluteMaximumCapacity();
 
-      Resource current = Resources.multiply(clusterResources, absUsed);
+      Resource current = root.getQueueResourceUsage().getUsed();
       Resource guaranteed = Resources.multiply(clusterResources, absCap);
       Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
       if (root instanceof LeafQueue) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58a6142c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 24e70bb..bca2def 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@@ -67,7 +68,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -382,7 +385,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // we verify both that C has priority on B and D (has it has >0 guarantees)
     // and that B and D are force to share their over capacity fairly (as they
     // are both zero-guarantees) hence D sees some of its containers preempted
-    verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC)));
   }
   
   
@@ -407,8 +410,8 @@ public class TestProportionalCapacityPreemptionPolicy {
 
     // XXX note: compensating for rounding error in Resources.multiplyTo
     // which is likely triggered since we use small numbers for readability
-    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
-    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE)));
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
   }
 
   @Test
@@ -571,7 +574,35 @@ public class TestProportionalCapacityPreemptionPolicy {
     verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
     setAMContainer = false;
   }
-  
+
+  @Test
+  public void testPreemptionWithVCoreResource() {
+    int[][] qData = new int[][] {
+        // / A B
+        { 100, 100, 100 }, // maxcap
+        { 5, 1, 1 }, // apps
+        { 2, 0, 0 }, // subqueues
+    };
+
+    // Resources can be set like memory:vcores
+    String[][] resData = new String[][] {
+        // / A B
+        { "100:100", "50:50", "50:50" },// abs
+        { "10:100", "10:100", "0" },    // used
+        { "70:20", "70:20", "10:100" }, // pending
+        { "0", "0", "0" },              // reserved
+        { "-1", "1:10", "1:10" },       // req granularity
+    };
+
+    // Passing last param as TRUE to use DominantResourceCalculator
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData,
+        true);
+    policy.editSchedule();
+
+    // 5 containers will be preempted here
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
+
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {
     private final ApplicationAttemptId appAttId;
@@ -598,37 +629,103 @@ public class TestProportionalCapacityPreemptionPolicy {
   ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
     ProportionalCapacityPreemptionPolicy policy =
       new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
+    Resource clusterResources =
+        Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
     ParentQueue mRoot = buildMockRootQueue(rand, qData);
     when(mCS.getRootQueue()).thenReturn(mRoot);
 
-    Resource clusterResources =
-      Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
     when(mCS.getClusterResource()).thenReturn(clusterResources);
     return policy;
   }
 
+  ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
+      String[][] resData) {
+    return buildPolicy(qData, resData, false);
+  }
+
+  ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
+      String[][] resData, boolean useDominantResourceCalculator) {
+    if (useDominantResourceCalculator) {
+      when(mCS.getResourceCalculator()).thenReturn(
+          new DominantResourceCalculator());
+    }
+    ProportionalCapacityPreemptionPolicy policy =
+        new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
+    Resource clusterResources = leafAbsCapacities(
+        parseResourceDetails(resData[0]), qData[2]);
+    when(mCS.getClusterResource()).thenReturn(clusterResources);
+    ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
+    when(mCS.getRootQueue()).thenReturn(mRoot);
+
+    return policy;
+  }
+
   ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
-    int[] abs      = queueData[0];
-    int[] maxCap   = queueData[1];
-    int[] used     = queueData[2];
-    int[] pending  = queueData[3];
-    int[] reserved = queueData[4];
-    int[] apps     = queueData[5];
-    int[] gran     = queueData[6];
-    int[] queues   = queueData[7];
+    Resource[] abs = generateResourceList(queueData[0]);
+    Resource[] used = generateResourceList(queueData[2]);
+    Resource[] pending = generateResourceList(queueData[3]);
+    Resource[] reserved = generateResourceList(queueData[4]);
+    Resource[] gran = generateResourceList(queueData[6]);
+    int[] maxCap = queueData[1];
+    int[] apps = queueData[5];
+    int[] queues = queueData[7];
+
+    return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
+  }
+
+  private ParentQueue buildMockRootQueue(Random rand2, String[][] resData,
+      int[][] queueData) {
+    Resource[] abs = parseResourceDetails(resData[0]);
+    Resource[] used = parseResourceDetails(resData[1]);
+    Resource[] pending = parseResourceDetails(resData[2]);
+    Resource[] reserved = parseResourceDetails(resData[3]);
+    Resource[] gran = parseResourceDetails(resData[4]);
+    int[] maxCap = queueData[0];
+    int[] apps = queueData[1];
+    int[] queues = queueData[2];
+
+    return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
+  }
+
+  Resource[] parseResourceDetails(String[] resData) {
+    List<Resource> resourceList = new ArrayList<Resource>();
+    for (int i = 0; i < resData.length; i++) {
+      String[] resource = resData[i].split(":");
+      if (resource.length == 1) {
+        resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), 0));
+      } else {
+        resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]),
+            Integer.valueOf(resource[1])));
+      }
+    }
+    return resourceList.toArray(new Resource[resourceList.size()]);
+  }
 
-    return mockNested(abs, maxCap, used, pending,  reserved, apps, gran, queues);
+  Resource[] generateResourceList(int[] qData) {
+    List<Resource> resourceList = new ArrayList<Resource>();
+    for (int i = 0; i < qData.length; i++) {
+      resourceList.add(Resource.newInstance(qData[i], 0));
+    }
+    return resourceList.toArray(new Resource[resourceList.size()]);
   }
 
-  ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
-      int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
-    float tot = leafAbsCapacities(abs, queues);
+  ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used,
+      Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran,
+      int[] queues) {
+    ResourceCalculator rc = mCS.getResourceCalculator();
+    Resource tot = leafAbsCapacities(abs, queues);
     Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
     ParentQueue root = mockParentQueue(null, queues[0], pqs);
+    ResourceUsage resUsage = new ResourceUsage();
+    resUsage.setUsed(used[0]);
     when(root.getQueueName()).thenReturn("/");
-    when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
-    when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
-    when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+    when(root.getAbsoluteUsedCapacity()).thenReturn(
+        Resources.divide(rc, tot, used[0], tot));
+    when(root.getAbsoluteCapacity()).thenReturn(
+        Resources.divide(rc, tot, abs[0], tot));
+    when(root.getAbsoluteMaximumCapacity()).thenReturn(
+        maxCap[0] / (float) tot.getMemory());
+    when(root.getQueueResourceUsage()).thenReturn(resUsage);
 
     for (int i = 1; i < queues.length; ++i) {
       final CSQueue q;
@@ -636,14 +733,20 @@ public class TestProportionalCapacityPreemptionPolicy {
       final String queueName = "queue" + ((char)('A' + i - 1));
       if (queues[i] > 0) {
         q = mockParentQueue(p, queues[i], pqs);
+        ResourceUsage resUsagePerQueue = new ResourceUsage();
+        resUsagePerQueue.setUsed(used[i]);
+        when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue);
       } else {
         q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
       }
       when(q.getParent()).thenReturn(p);
       when(q.getQueueName()).thenReturn(queueName);
-      when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
-      when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
-      when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
+      when(q.getAbsoluteUsedCapacity()).thenReturn(
+          Resources.divide(rc, tot, used[i], tot));
+      when(q.getAbsoluteCapacity()).thenReturn(
+          Resources.divide(rc, tot, abs[i], tot));
+      when(q.getAbsoluteMaximumCapacity()).thenReturn(
+          maxCap[i] / (float) tot.getMemory());
     }
     assert 0 == pqs.size();
     return root;
@@ -663,11 +766,17 @@ public class TestProportionalCapacityPreemptionPolicy {
     return pq;
   }
 
-  LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, 
-      int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
+  LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs,
+      Resource[] used, Resource[] pending, Resource[] reserved, int[] apps,
+      Resource[] gran) {
     LeafQueue lq = mock(LeafQueue.class);
-    when(lq.getTotalResourcePending()).thenReturn(
-        Resource.newInstance(pending[i], 0));
+    ResourceCalculator rc = mCS.getResourceCalculator();
+    when(lq.getTotalResourcePending()).thenReturn(pending[i]);
+    // need to set pending resource in resource usage as well
+    ResourceUsage ru = new ResourceUsage();
+    ru.setPending(pending[i]);
+    ru.setUsed(used[i]);
+    when(lq.getQueueResourceUsage()).thenReturn(ru);
     // consider moving where CapacityScheduler::comparator accessible
     NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
       new Comparator<FiCaSchedulerApp>() {
@@ -679,9 +788,9 @@ public class TestProportionalCapacityPreemptionPolicy {
       });
     // applications are added in global L->R order in queues
     if (apps[i] != 0) {
-      int aUsed    = used[i] / apps[i];
-      int aPending = pending[i] / apps[i];
-      int aReserve = reserved[i] / apps[i];
+      Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]);
+      Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]);
+      Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]);
       for (int a = 0; a < apps[i]; ++a) {
         qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]));
         ++appAlloc;
@@ -695,9 +804,10 @@ public class TestProportionalCapacityPreemptionPolicy {
     return lq;
   }
 
-  FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved,
-      int gran) {
+  FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending,
+      Resource reserved, Resource gran) {
     FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+    ResourceCalculator rc = mCS.getResourceCalculator();
 
     ApplicationId appId = ApplicationId.newInstance(TS, id);
     ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
@@ -705,22 +815,28 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(app.getApplicationAttemptId()).thenReturn(appAttId);
 
     int cAlloc = 0;
-    Resource unit = Resource.newInstance(gran, 0);
+    Resource unit = gran;
     List<RMContainer> cReserved = new ArrayList<RMContainer>();
-    for (int i = 0; i < reserved; i += gran) {
+    Resource resIter = Resource.newInstance(0, 0);
+    for (; Resources.lessThan(rc, mCS.getClusterResource(), resIter, reserved); Resources
+        .addTo(resIter, gran)) {
       cReserved.add(mockContainer(appAttId, cAlloc, unit, 1));
       ++cAlloc;
     }
     when(app.getReservedContainers()).thenReturn(cReserved);
 
     List<RMContainer> cLive = new ArrayList<RMContainer>();
-    for (int i = 0; i < used; i += gran) {
+    Resource usedIter = Resource.newInstance(0, 0);
+    int i = 0;
+    for (; Resources.lessThan(rc, mCS.getClusterResource(), usedIter, used); Resources
+        .addTo(usedIter, gran)) {
       if(setAMContainer && i == 0){
         cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
       }else{
         cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
       }
       ++cAlloc;
+      ++i;
     }
     when(app.getLiveContainers()).thenReturn(cLive);
     return app;
@@ -752,6 +868,16 @@ public class TestProportionalCapacityPreemptionPolicy {
     return ret;
   }
 
+  static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) {
+    Resource ret = Resource.newInstance(0, 0);
+    for (int i = 0; i < abs.length; ++i) {
+      if (0 == subqueues[i]) {
+        Resources.addTo(ret, abs[i]);
+      }
+    }
+    return ret;
+  }
+
   void printString(CSQueue nq, String indent) {
     if (nq instanceof ParentQueue) {
       System.out.println(indent + nq.getQueueName()