You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2016/10/28 18:04:40 UTC

[42/50] [abbrv] hadoop git commit: YARN-4743. FairSharePolicy breaks TimSort assumption. (Zephyr Guo and Yufei Gu via kasha)

YARN-4743. FairSharePolicy breaks TimSort assumption. (Zephyr Guo and Yufei Gu via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4df8ed63
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4df8ed63
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4df8ed63

Branch: refs/heads/HDFS-9806
Commit: 4df8ed63ed93f2542e4b48f521b0cc6624ab59c1
Parents: b2c4f24
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Thu Oct 27 17:42:44 2016 -0700
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Thu Oct 27 17:45:48 2016 -0700

----------------------------------------------------------------------
 .../fair/policies/FairSharePolicy.java          |  31 ++-
 .../scheduler/fair/TestSchedulingPolicy.java    | 228 +++++++++++++++++++
 2 files changed, 254 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4df8ed63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 6aa8405..f120f0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -63,7 +63,11 @@ public class FairSharePolicy extends SchedulingPolicy {
    * 
    * Schedulables above their min share are compared by (runningTasks / weight).
    * If all weights are equal, slots are given to the job with the fewest tasks;
-   * otherwise, jobs with more weight get proportionally more slots.
+   * otherwise, jobs with more weight get proportionally more slots. If weight
+   * equals to 0, we can't compare Schedulables by (resource usage/weight).
+   * There are two situations: 1)All weights equal to 0, slots are given
+   * to one with less resource usage. 2)Only one of weight equals to 0, slots
+   * are given to the one with non-zero weight.
    */
   private static class FairShareComparator implements Comparator<Schedulable>,
       Serializable {
@@ -74,6 +78,7 @@ public class FairSharePolicy extends SchedulingPolicy {
     public int compare(Schedulable s1, Schedulable s2) {
       double minShareRatio1, minShareRatio2;
       double useToWeightRatio1, useToWeightRatio2;
+      double weight1, weight2;
       Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
           s1.getMinShare(), s1.getDemand());
       Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
@@ -86,10 +91,26 @@ public class FairSharePolicy extends SchedulingPolicy {
           / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize();
       minShareRatio2 = (double) s2.getResourceUsage().getMemorySize()
           / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize();
-      useToWeightRatio1 = s1.getResourceUsage().getMemorySize() /
-          s1.getWeights().getWeight(ResourceType.MEMORY);
-      useToWeightRatio2 = s2.getResourceUsage().getMemorySize() /
-          s2.getWeights().getWeight(ResourceType.MEMORY);
+
+      weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);
+      weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);
+      if (weight1 > 0.0 && weight2 > 0.0) {
+        useToWeightRatio1 = s1.getResourceUsage().getMemorySize() / weight1;
+        useToWeightRatio2 = s2.getResourceUsage().getMemorySize() / weight2;
+      } else { // Either weight1 or weight2 equals to 0
+        if (weight1 == weight2) {
+          // If they have same weight, just compare usage
+          useToWeightRatio1 = s1.getResourceUsage().getMemorySize();
+          useToWeightRatio2 = s2.getResourceUsage().getMemorySize();
+        } else {
+          // By setting useToWeightRatios to negative weights, we give the
+          // zero-weight one less priority, so the non-zero weight one will
+          // be given slots.
+          useToWeightRatio1 = -weight1;
+          useToWeightRatio2 = -weight2;
+        }
+      }
+
       int res = 0;
       if (s1Needy && !s2Needy)
         res = -1;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4df8ed63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
index eeedb09..dea2dd1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
@@ -21,13 +21,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 public class TestSchedulingPolicy {
+  private static final Log LOG = LogFactory.getLog(TestSchedulingPolicy.class);
 
   @Test(timeout = 1000)
   public void testParseSchedulingPolicy()
@@ -125,4 +137,220 @@ public class TestSchedulingPolicy {
     assertFalse(ERR,
         SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
   }
+
+  /**
+   * Test whether {@link FairSharePolicy.FairShareComparator} is transitive.
+   */
+  @Test
+  public void testFairShareComparatorTransitivity() {
+    FairSharePolicy policy = new FairSharePolicy();
+    Comparator<Schedulable> fairShareComparator = policy.getComparator();
+    FairShareComparatorTester tester =
+        new FairShareComparatorTester(fairShareComparator);
+    tester.testTransitivity();
+  }
+
+
+  /**
+   * This class is responsible for testing the transitivity of
+   * {@link FairSharePolicy.FairShareComparator}. We will generate
+   * a lot of triples(each triple contains three {@link Schedulable}),
+   * and then we verify transitivity by using each triple.
+   *
+   * <p>How to generate:</p>
+   * For each field in {@link Schedulable} we all have a data collection. We
+   * combine these data to construct a {@link Schedulable}, and generate all
+   * cases of triple by DFS(depth first search algorithm). We can get 100% code
+   * coverage by DFS.
+   */
+  private class FairShareComparatorTester {
+    private Comparator<Schedulable> fairShareComparator;
+
+    // Use the following data collections to generate three Schedulable.
+    private Resource minShare = Resource.newInstance(0, 1);
+
+    private Resource demand = Resource.newInstance(4, 1);
+
+    private String[] nameCollection = {"A", "B", "C"};
+
+    private long[] startTimeColloection = {1L, 2L, 3L};
+
+    private Resource[] usageCollection = {
+        Resource.newInstance(0, 1), Resource.newInstance(2, 1),
+        Resource.newInstance(4, 1) };
+
+    private ResourceWeights[] weightsCollection = {
+        new ResourceWeights(0.0f), new ResourceWeights(1.0f),
+        new ResourceWeights(2.0f) };
+
+
+
+    public FairShareComparatorTester(
+        Comparator<Schedulable> fairShareComparator) {
+      this.fairShareComparator = fairShareComparator;
+    }
+
+    public void testTransitivity() {
+      generateAndTest(new Stack<Schedulable>());
+    }
+
+    private void generateAndTest(Stack<Schedulable> genSchedulable) {
+      if (genSchedulable.size() == 3) {
+        // We get three Schedulable objects, let's use them to check the
+        // comparator.
+        Assert.assertTrue("The comparator must ensure transitivity",
+            checkTransitivity(genSchedulable));
+        return;
+      }
+
+      for (int i = 0; i < nameCollection.length; i++) {
+        for (int j = 0; j < startTimeColloection.length; j++) {
+          for (int k = 0; k < usageCollection.length; k++) {
+            for (int t = 0; t < weightsCollection.length; t++) {
+              genSchedulable.push(createSchedulable(i, j, k, t));
+              generateAndTest(genSchedulable);
+              genSchedulable.pop();
+            }
+          }
+        }
+      }
+
+    }
+
+    private Schedulable createSchedulable(
+        int nameIdx, int startTimeIdx, int usageIdx, int weightsIdx) {
+      return new MockSchedulable(minShare, demand, nameCollection[nameIdx],
+        startTimeColloection[startTimeIdx], usageCollection[usageIdx],
+        weightsCollection[weightsIdx]);
+    }
+
+    private boolean checkTransitivity(
+        Collection<Schedulable> schedulableObjs) {
+
+      Assert.assertEquals(3, schedulableObjs.size());
+      Schedulable[] copy = schedulableObjs.toArray(new Schedulable[3]);
+
+      if (fairShareComparator.compare(copy[0], copy[1]) > 0) {
+        swap(copy, 0, 1);
+      }
+
+      if (fairShareComparator.compare(copy[1], copy[2]) > 0) {
+        swap(copy, 1, 2);
+
+        if (fairShareComparator.compare(copy[0], copy[1]) > 0) {
+          swap(copy, 0, 1);
+        }
+      }
+
+      // Here, we have got the following condition:
+      // copy[0] <= copy[1] && copy[1] <= copy[2]
+      //
+      // So, just check copy[0] <= copy[2]
+      if (fairShareComparator.compare(copy[0], copy[2]) <= 0) {
+        return true;
+      } else {
+        LOG.fatal("Failure data: " + copy[0] + " " + copy[1] + " " + copy[2]);
+        return false;
+      }
+    }
+
+    private void swap(Schedulable[] array, int x, int y) {
+      Schedulable tmp = array[x];
+      array[x] = array[y];
+      array[y] = tmp;
+    }
+
+
+    private class MockSchedulable implements Schedulable {
+      private Resource minShare;
+      private Resource demand;
+      private String name;
+      private long startTime;
+      private Resource usage;
+      private ResourceWeights weights;
+
+      public MockSchedulable(Resource minShare, Resource demand, String name,
+          long startTime, Resource usage, ResourceWeights weights) {
+        this.minShare = minShare;
+        this.demand = demand;
+        this.name = name;
+        this.startTime = startTime;
+        this.usage = usage;
+        this.weights = weights;
+      }
+
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public Resource getDemand() {
+        return demand;
+      }
+
+      @Override
+      public Resource getResourceUsage() {
+        return usage;
+      }
+
+      @Override
+      public Resource getMinShare() {
+        return minShare;
+      }
+
+      @Override
+      public ResourceWeights getWeights() {
+        return weights;
+      }
+
+      @Override
+      public long getStartTime() {
+        return startTime;
+      }
+
+      @Override
+      public Resource getMaxShare() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Priority getPriority() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void updateDemand() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Resource assignContainer(FSSchedulerNode node) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public RMContainer preemptContainer() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Resource getFairShare() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void setFairShare(Resource fairShare) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public String toString() {
+        return "{name:" + name + ", start:" + startTime + ", usage:" + usage +
+            ", weights:" + weights + ", demand:" + demand +
+            ", minShare:" + minShare + "}";
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org