You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/07 19:24:23 UTC

[4/8] hadoop git commit: YARN-3098. Created common QueueCapacities class in Capacity Scheduler to track capacities-by-labels of queues. Contributed by Wangda Tan (cherry picked from commit 21d80b3dd90a8e33e51701887c8d9369ed4ab17d)

YARN-3098. Created common QueueCapacities class in Capacity Scheduler to track capacities-by-labels of queues. Contributed by Wangda Tan
(cherry picked from commit 21d80b3dd90a8e33e51701887c8d9369ed4ab17d)

(cherry picked from commit c0b1311a93614becc4a255af48fb7b697d491b80)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c94f071
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c94f071
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c94f071

Branch: refs/heads/branch-2.6.1
Commit: 4c94f0714012e1a7cfce1ea7b60d3c86f0b5cce1
Parents: d9281fb
Author: Jian He <ji...@apache.org>
Authored: Tue Feb 3 11:43:12 2015 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Sat Sep 5 20:54:20 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/ResourceUsage.java                |  80 +++-----
 .../scheduler/capacity/QueueCapacities.java     | 191 +++++++++++++++++++
 .../scheduler/capacity/TestQueueCapacities.java | 127 ++++++++++++
 4 files changed, 351 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c94f071/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index bac13ff..58c2139 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -19,6 +19,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3092. Created a common ResourceUsage class to track labeled resource
     usages in Capacity Scheduler. (Wangda Tan via jianhe)
 
+    YARN-3098. Created common QueueCapacities class in Capacity Scheduler to
+    track capacities-by-labels of queues. (Wangda Tan via jianhe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c94f071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index 5a4cced..c651878 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -56,14 +56,10 @@ public class ResourceUsage {
   private enum ResourceType {
     USED(0), PENDING(1), AMUSED(2), RESERVED(3), HEADROOM(4);
 
-    private int value;
+    private int idx;
 
     private ResourceType(int value) {
-      this.value = value;
-    }
-
-    public int getValue() {
-      return this.value;
+      this.idx = value;
     }
   }
 
@@ -77,22 +73,6 @@ public class ResourceUsage {
         resArr[i] = Resource.newInstance(0, 0);
       }
     }
-
-    public Resource get(ResourceType type) {
-      return resArr[type.getValue()];
-    }
-
-    public void set(ResourceType type, Resource res) {
-      resArr[type.getValue()] = res;
-    }
-
-    public void inc(ResourceType type, Resource res) {
-      Resources.addTo(resArr[type.getValue()], res);
-    }
-
-    public void dec(ResourceType type, Resource res) {
-      Resources.subtractFrom(resArr[type.getValue()], res);
-    }
   }
 
   /*
@@ -103,11 +83,11 @@ public class ResourceUsage {
   }
 
   public Resource getUsed(String label) {
-    return internalGet(label, ResourceType.USED);
+    return _get(label, ResourceType.USED);
   }
 
   public void incUsed(String label, Resource res) {
-    internalInc(label, ResourceType.USED, res);
+    _inc(label, ResourceType.USED, res);
   }
 
   public void incUsed(Resource res) {
@@ -119,7 +99,7 @@ public class ResourceUsage {
   }
 
   public void decUsed(String label, Resource res) {
-    internalDec(label, ResourceType.USED, res);
+    _dec(label, ResourceType.USED, res);
   }
 
   public void setUsed(Resource res) {
@@ -127,7 +107,7 @@ public class ResourceUsage {
   }
 
   public void setUsed(String label, Resource res) {
-    internalSet(label, ResourceType.USED, res);
+    _set(label, ResourceType.USED, res);
   }
 
   /*
@@ -138,11 +118,11 @@ public class ResourceUsage {
   }
 
   public Resource getPending(String label) {
-    return internalGet(label, ResourceType.PENDING);
+    return _get(label, ResourceType.PENDING);
   }
 
   public void incPending(String label, Resource res) {
-    internalInc(label, ResourceType.PENDING, res);
+    _inc(label, ResourceType.PENDING, res);
   }
 
   public void incPending(Resource res) {
@@ -154,7 +134,7 @@ public class ResourceUsage {
   }
 
   public void decPending(String label, Resource res) {
-    internalDec(label, ResourceType.PENDING, res);
+    _dec(label, ResourceType.PENDING, res);
   }
 
   public void setPending(Resource res) {
@@ -162,7 +142,7 @@ public class ResourceUsage {
   }
 
   public void setPending(String label, Resource res) {
-    internalSet(label, ResourceType.PENDING, res);
+    _set(label, ResourceType.PENDING, res);
   }
 
   /*
@@ -173,11 +153,11 @@ public class ResourceUsage {
   }
 
   public Resource getReserved(String label) {
-    return internalGet(label, ResourceType.RESERVED);
+    return _get(label, ResourceType.RESERVED);
   }
 
   public void incReserved(String label, Resource res) {
-    internalInc(label, ResourceType.RESERVED, res);
+    _inc(label, ResourceType.RESERVED, res);
   }
 
   public void incReserved(Resource res) {
@@ -189,7 +169,7 @@ public class ResourceUsage {
   }
 
   public void decReserved(String label, Resource res) {
-    internalDec(label, ResourceType.RESERVED, res);
+    _dec(label, ResourceType.RESERVED, res);
   }
 
   public void setReserved(Resource res) {
@@ -197,7 +177,7 @@ public class ResourceUsage {
   }
 
   public void setReserved(String label, Resource res) {
-    internalSet(label, ResourceType.RESERVED, res);
+    _set(label, ResourceType.RESERVED, res);
   }
 
   /*
@@ -208,11 +188,11 @@ public class ResourceUsage {
   }
 
   public Resource getHeadroom(String label) {
-    return internalGet(label, ResourceType.HEADROOM);
+    return _get(label, ResourceType.HEADROOM);
   }
 
   public void incHeadroom(String label, Resource res) {
-    internalInc(label, ResourceType.HEADROOM, res);
+    _inc(label, ResourceType.HEADROOM, res);
   }
 
   public void incHeadroom(Resource res) {
@@ -224,7 +204,7 @@ public class ResourceUsage {
   }
 
   public void decHeadroom(String label, Resource res) {
-    internalDec(label, ResourceType.HEADROOM, res);
+    _dec(label, ResourceType.HEADROOM, res);
   }
 
   public void setHeadroom(Resource res) {
@@ -232,7 +212,7 @@ public class ResourceUsage {
   }
 
   public void setHeadroom(String label, Resource res) {
-    internalSet(label, ResourceType.HEADROOM, res);
+    _set(label, ResourceType.HEADROOM, res);
   }
 
   /*
@@ -243,11 +223,11 @@ public class ResourceUsage {
   }
 
   public Resource getAMUsed(String label) {
-    return internalGet(label, ResourceType.AMUSED);
+    return _get(label, ResourceType.AMUSED);
   }
 
   public void incAMUsed(String label, Resource res) {
-    internalInc(label, ResourceType.AMUSED, res);
+    _inc(label, ResourceType.AMUSED, res);
   }
 
   public void incAMUsed(Resource res) {
@@ -259,7 +239,7 @@ public class ResourceUsage {
   }
 
   public void decAMUsed(String label, Resource res) {
-    internalDec(label, ResourceType.AMUSED, res);
+    _dec(label, ResourceType.AMUSED, res);
   }
 
   public void setAMUsed(Resource res) {
@@ -267,7 +247,7 @@ public class ResourceUsage {
   }
 
   public void setAMUsed(String label, Resource res) {
-    internalSet(label, ResourceType.AMUSED, res);
+    _set(label, ResourceType.AMUSED, res);
   }
 
   private static Resource normalize(Resource res) {
@@ -277,14 +257,14 @@ public class ResourceUsage {
     return res;
   }
 
-  private Resource internalGet(String label, ResourceType type) {
+  private Resource _get(String label, ResourceType type) {
     try {
       readLock.lock();
       UsageByLabel usage = usages.get(label);
       if (null == usage) {
         return Resources.none();
       }
-      return normalize(usage.get(type));
+      return normalize(usage.resArr[type.idx]);
     } finally {
       readLock.unlock();
     }
@@ -300,31 +280,31 @@ public class ResourceUsage {
     return usages.get(label);
   }
 
-  private void internalSet(String label, ResourceType type, Resource res) {
+  private void _set(String label, ResourceType type, Resource res) {
     try {
       writeLock.lock();
       UsageByLabel usage = getAndAddIfMissing(label);
-      usage.set(type, res);
+      usage.resArr[type.idx] = res;
     } finally {
       writeLock.unlock();
     }
   }
 
-  private void internalInc(String label, ResourceType type, Resource res) {
+  private void _inc(String label, ResourceType type, Resource res) {
     try {
       writeLock.lock();
       UsageByLabel usage = getAndAddIfMissing(label);
-      usage.inc(type, res);
+      Resources.addTo(usage.resArr[type.idx], res);
     } finally {
       writeLock.unlock();
     }
   }
 
-  private void internalDec(String label, ResourceType type, Resource res) {
+  private void _dec(String label, ResourceType type, Resource res) {
     try {
       writeLock.lock();
       UsageByLabel usage = getAndAddIfMissing(label);
-      usage.dec(type, res);
+      Resources.subtractFrom(usage.resArr[type.idx], res);
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c94f071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
new file mode 100644
index 0000000..a0e6d8c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+
+public class QueueCapacities {
+  private static final String NL = CommonNodeLabelsManager.NO_LABEL;
+  private static final float LABEL_DOESNT_EXIST_CAP = 0f;
+  private Map<String, Capacities> capacitiesMap;
+  private ReadLock readLock;
+  private WriteLock writeLock;
+
+  public QueueCapacities() {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+
+    capacitiesMap = new HashMap<String, Capacities>();
+  }
+  
+  // Usage enum here to make implement cleaner
+  private enum CapacityType {
+    USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5);
+
+    private int idx;
+
+    private CapacityType(int idx) {
+      this.idx = idx;
+    }
+  }
+
+  private static class Capacities {
+    private float[] capacitiesArr;
+    
+    public Capacities() {
+      capacitiesArr = new float[CapacityType.values().length];
+    }
+  }
+  
+  private float _get(String label, CapacityType type) {
+    try {
+      readLock.lock();
+      Capacities cap = capacitiesMap.get(label);
+      if (null == cap) {
+        return LABEL_DOESNT_EXIST_CAP;
+      }
+      return cap.capacitiesArr[type.idx];
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  private void _set(String label, CapacityType type, float value) {
+    try {
+      writeLock.lock();
+      Capacities cap = capacitiesMap.get(label);
+      if (null == cap) {
+        cap = new Capacities();
+        capacitiesMap.put(label, cap);
+      }
+      cap.capacitiesArr[type.idx] = value;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /* Used Capacity Getter and Setter */
+  public float getUsedCapacity() {
+    return _get(NL, CapacityType.USED_CAP);
+  }
+
+  public float getUsedCapacity(String label) {
+    return _get(label, CapacityType.USED_CAP);
+  }
+
+  public void setUsedCapacity(float value) {
+    _set(NL, CapacityType.USED_CAP, value);
+  }
+
+  public void setUsedCapacity(String label, float value) {
+    _set(label, CapacityType.USED_CAP, value);
+  }
+
+  /* Absolute Used Capacity Getter and Setter */
+  public float getAbsoluteUsedCapacity() {
+    return _get(NL, CapacityType.ABS_USED_CAP);
+  }
+
+  public float getAbsoluteUsedCapacity(String label) {
+    return _get(label, CapacityType.ABS_USED_CAP);
+  }
+
+  public void setAbsoluteUsedCapacity(float value) {
+    _set(NL, CapacityType.ABS_USED_CAP, value);
+  }
+
+  public void setAbsoluteUsedCapacity(String label, float value) {
+    _set(label, CapacityType.ABS_USED_CAP, value);
+  }
+
+  /* Capacity Getter and Setter */
+  public float getCapacity() {
+    return _get(NL, CapacityType.CAP);
+  }
+
+  public float getCapacity(String label) {
+    return _get(label, CapacityType.CAP);
+  }
+
+  public void setCapacity(float value) {
+    _set(NL, CapacityType.CAP, value);
+  }
+
+  public void setCapacity(String label, float value) {
+    _set(label, CapacityType.CAP, value);
+  }
+
+  /* Absolute Capacity Getter and Setter */
+  public float getAbsoluteCapacity() {
+    return _get(NL, CapacityType.ABS_CAP);
+  }
+
+  public float getAbsoluteCapacity(String label) {
+    return _get(label, CapacityType.ABS_CAP);
+  }
+
+  public void setAbsoluteCapacity(float value) {
+    _set(NL, CapacityType.ABS_CAP, value);
+  }
+
+  public void setAbsoluteCapacity(String label, float value) {
+    _set(label, CapacityType.ABS_CAP, value);
+  }
+
+  /* Maximum Capacity Getter and Setter */
+  public float getMaximumCapacity() {
+    return _get(NL, CapacityType.MAX_CAP);
+  }
+
+  public float getMaximumCapacity(String label) {
+    return _get(label, CapacityType.MAX_CAP);
+  }
+
+  public void setMaximumCapacity(float value) {
+    _set(NL, CapacityType.MAX_CAP, value);
+  }
+
+  public void setMaximumCapacity(String label, float value) {
+    _set(label, CapacityType.MAX_CAP, value);
+  }
+
+  /* Absolute Maximum Capacity Getter and Setter */
+  public float getAbsoluteMaximumCapacity() {
+    return _get(NL, CapacityType.ABS_MAX_CAP);
+  }
+
+  public float getAbsoluteMaximumCapacity(String label) {
+    return _get(label, CapacityType.ABS_MAX_CAP);
+  }
+
+  public void setAbsoluteMaximumCapacity(float value) {
+    _set(NL, CapacityType.ABS_MAX_CAP, value);
+  }
+
+  public void setAbsoluteMaximumCapacity(String label, float value) {
+    _set(label, CapacityType.ABS_MAX_CAP, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c94f071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java
new file mode 100644
index 0000000..89a5311
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestQueueCapacities {
+  private static final Log LOG = LogFactory.getLog(TestQueueCapacities.class);
+  private String suffix;
+
+  @Parameterized.Parameters
+  public static Collection<String[]> getParameters() {
+    return Arrays.asList(new String[][] { 
+        { "Capacity" },
+        { "AbsoluteCapacity" }, 
+        { "UsedCapacity" }, 
+        { "AbsoluteUsedCapacity" },
+        { "MaximumCapacity" }, 
+        { "AbsoluteMaximumCapacity" } });
+  }
+
+  public TestQueueCapacities(String suffix) {
+    this.suffix = suffix;
+  }
+
+  private static float get(QueueCapacities obj, String suffix,
+      String label) throws Exception {
+    return executeByName(obj, "get" + suffix, label, -1f);
+  }
+
+  private static void set(QueueCapacities obj, String suffix,
+      String label, float value) throws Exception {
+    executeByName(obj, "set" + suffix, label, value);
+  }
+
+  // Use reflection to avoid too much avoid code
+  private static float executeByName(QueueCapacities obj, String methodName,
+      String label, float value) throws Exception {
+    // We have 4 kinds of method
+    // 1. getXXX() : float
+    // 2. getXXX(label) : float
+    // 3. setXXX(float) : void
+    // 4. setXXX(label, float) : void
+    if (methodName.startsWith("get")) {
+      float result;
+      if (label == null) {
+        // 1.
+        Method method = QueueCapacities.class.getDeclaredMethod(methodName);
+        result = (Float) method.invoke(obj);
+      } else {
+        // 2.
+        Method method =
+            QueueCapacities.class.getDeclaredMethod(methodName, String.class);
+        result = (Float) method.invoke(obj, label);
+      }
+      return result;
+    } else {
+      if (label == null) {
+        // 3.
+        Method method =
+            QueueCapacities.class.getDeclaredMethod(methodName, Float.TYPE);
+        method.invoke(obj, value);
+      } else {
+        // 4.
+        Method method =
+            QueueCapacities.class.getDeclaredMethod(methodName, String.class,
+                Float.TYPE);
+        method.invoke(obj, label, value);
+      }
+      return -1f;
+    }
+  }
+
+  private void internalTestModifyAndRead(String label) throws Exception {
+    QueueCapacities qc = new QueueCapacities();
+
+    // First get returns 0 always
+    Assert.assertEquals(0f, get(qc, suffix, label), 1e-8);
+
+    // Set to 1, and check
+    set(qc, suffix, label, 1f);
+    Assert.assertEquals(1f, get(qc, suffix, label), 1e-8);
+
+    // Set to 2, and check
+    set(qc, suffix, label, 2f);
+    Assert.assertEquals(2f, get(qc, suffix, label), 1e-8);
+  }
+
+  void check(int mem, int cpu, Resource res) {
+    Assert.assertEquals(mem, res.getMemory());
+    Assert.assertEquals(cpu, res.getVirtualCores());
+  }
+
+  @Test
+  public void testModifyAndRead() throws Exception {
+    LOG.info("Test - " + suffix);
+    internalTestModifyAndRead(null);
+    internalTestModifyAndRead("label");
+  }
+}
\ No newline at end of file