You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2020/04/28 23:15:08 UTC

[hadoop] branch trunk updated: HADOOP-17010. Add queue capacity support for FairCallQueue (#1977)

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4202750  HADOOP-17010. Add queue capacity support for FairCallQueue (#1977)
4202750 is described below

commit 4202750040f91f8dcc218ecc7d3ccf81a8e68b2a
Author: lfengnan <lo...@gmail.com>
AuthorDate: Tue Apr 28 16:14:55 2020 -0700

    HADOOP-17010. Add queue capacity support for FairCallQueue (#1977)
---
 .../apache/hadoop/fs/CommonConfigurationKeys.java  |  3 ++
 .../org/apache/hadoop/ipc/CallQueueManager.java    | 56 +++++++++++++++++--
 .../java/org/apache/hadoop/ipc/FairCallQueue.java  | 25 +++++++--
 .../src/site/markdown/FairCallQueue.md             |  5 ++
 .../org/apache/hadoop/ipc/TestFairCallQueue.java   | 63 ++++++++++++++++++++++
 5 files changed, 143 insertions(+), 9 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 40ddfba..c08af39 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -114,6 +114,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
       "callqueue.overflow.trigger.failover";
   public static final boolean IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT =
       false;
+  /** Callqueue subqueue capacity weights. */
+  public static final String IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY =
+      "callqueue.capacity.weights";
 
   /**
    * IPC scheduler priority levels.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 81b7d34..53ac34b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.AbstractQueue;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
@@ -77,8 +78,10 @@ public class CallQueueManager<E extends Schedulable>
     int priorityLevels = parseNumLevels(namespace, conf);
     this.scheduler = createScheduler(schedulerClass, priorityLevels,
         namespace, conf);
+    int[] capacityWeights = parseCapacityWeights(priorityLevels,
+        namespace, conf);
     BlockingQueue<E> bq = createCallQueueInstance(backingClass,
-        priorityLevels, maxQueueSize, namespace, conf);
+        priorityLevels, maxQueueSize, namespace, capacityWeights, conf);
     this.clientBackOffEnabled = clientBackOffEnabled;
     this.serverFailOverEnabled = conf.getBoolean(
         namespace + "." +
@@ -146,13 +149,14 @@ public class CallQueueManager<E extends Schedulable>
 
   private <T extends BlockingQueue<E>> T createCallQueueInstance(
       Class<T> theClass, int priorityLevels, int maxLen, String ns,
-      Configuration conf) {
+      int[] capacityWeights, Configuration conf) {
 
     // Used for custom, configurable callqueues
     try {
       Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
-          int.class, String.class, Configuration.class);
-      return ctor.newInstance(priorityLevels, maxLen, ns, conf);
+          int.class, String.class, int[].class, Configuration.class);
+      return ctor.newInstance(priorityLevels, maxLen, ns,
+          capacityWeights, conf);
     } catch (RuntimeException e) {
       throw e;
     } catch (InvocationTargetException e) {
@@ -344,6 +348,47 @@ public class CallQueueManager<E extends Schedulable>
   }
 
   /**
+   * Read the weights of capacity in callqueue and pass the value to
+   * callqueue constructions.
+   */
+  private static int[] parseCapacityWeights(
+      int priorityLevels, String ns, Configuration conf) {
+    int[] weights = conf.getInts(ns + "." +
+      CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY);
+    if (weights.length == 0) {
+      weights = getDefaultQueueCapacityWeights(priorityLevels);
+    } else if (weights.length != priorityLevels) {
+      throw new IllegalArgumentException(
+          CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY + " must "
+              + "specify " + priorityLevels + " capacity weights: one for each "
+              + "priority level");
+    } else {
+      // only allow positive numbers
+      for (int w : weights) {
+        if (w <= 0) {
+          throw new IllegalArgumentException(
+              CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY +
+                  " only takes positive weights. " + w + " capacity weight " +
+                  "found");
+        }
+      }
+    }
+    return weights;
+  }
+
+  /**
+   * By default, queue capacity is the same for all priority levels.
+   *
+   * @param priorityLevels number of levels
+   * @return default weights
+   */
+  public static int[] getDefaultQueueCapacityWeights(int priorityLevels) {
+    int[] weights = new int[priorityLevels];
+    Arrays.fill(weights, 1);
+    return weights;
+  }
+
+  /**
    * Replaces active queue with the newly requested one and transfers
    * all calls to the newQ before returning.
    */
@@ -355,8 +400,9 @@ public class CallQueueManager<E extends Schedulable>
     this.scheduler.stop();
     RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
         ns, conf);
+    int[] capacityWeights = parseCapacityWeights(priorityLevels, ns, conf);
     BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
-        priorityLevels, maxSize, ns, conf);
+        priorityLevels, maxSize, ns, capacityWeights, conf);
 
     // Our current queue becomes the old queue
     BlockingQueue<E> oldQ = putRef.get();
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
index d15a710..939149f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
@@ -80,17 +80,27 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
 
   /* Failover if queue is filled up */
   private boolean serverFailOverEnabled;
+
+  @VisibleForTesting
+  public FairCallQueue(int priorityLevels, int capacity, String ns,
+      Configuration conf) {
+    this(priorityLevels, capacity, ns,
+        CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels), conf);
+  }
+
   /**
    * Create a FairCallQueue.
    * @param capacity the total size of all sub-queues
    * @param ns the prefix to use for configuration
+   * @param capacityWeights the weights array for capacity allocation
+   *                        among subqueues
    * @param conf the configuration to read from
    * Notes: Each sub-queue has a capacity of `capacity / numSubqueues`.
    * The first or the highest priority sub-queue has an excess capacity
    * of `capacity % numSubqueues`
    */
   public FairCallQueue(int priorityLevels, int capacity, String ns,
-      Configuration conf) {
+      int[] capacityWeights, Configuration conf) {
     if(priorityLevels < 1) {
       throw new IllegalArgumentException("Number of Priority Levels must be " +
           "at least 1");
@@ -101,11 +111,18 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
 
     this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
     this.overflowedCalls = new ArrayList<AtomicLong>(numQueues);
-    int queueCapacity = capacity / numQueues;
-    int capacityForFirstQueue = queueCapacity + (capacity % numQueues);
+    int totalWeights = 0;
+    for (int i = 0; i < capacityWeights.length; i++) {
+      totalWeights += capacityWeights[i];
+    }
+    int residueCapacity = capacity % totalWeights;
+    int unitCapacity = capacity / totalWeights;
+    int queueCapacity;
     for(int i=0; i < numQueues; i++) {
+      queueCapacity = unitCapacity * capacityWeights[i];
       if (i == 0) {
-        this.queues.add(new LinkedBlockingQueue<E>(capacityForFirstQueue));
+        this.queues.add(new LinkedBlockingQueue<E>(
+            queueCapacity + residueCapacity));
       } else {
         this.queues.add(new LinkedBlockingQueue<E>(queueCapacity));
       }
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md b/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md
index 22ac05a..887d305 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md
@@ -126,6 +126,7 @@ omitted.
 |:---- |:---- |:---- |:--- |
 | backoff.enable | General | Whether or not to enable client backoff when a queue is full. | false |
 | callqueue.impl | General | The fully qualified name of a class to use as the implementation of a call queue. Use `org.apache.hadoop.ipc.FairCallQueue` for the Fair Call Queue. | `java.util.concurrent.LinkedBlockingQueue` (FIFO queue) |
+| callqueue.capacity.weights | General | The capacity allocation weights among all subqueues. A postive int array whose length is equal to the `scheduler.priority.levels` is expected where each int is the relative weight out of total capacity. i.e. if a queue with capacity weight `w`, its queue capacity is `capacity * w/sum(weights)` |
 | scheduler.impl | General | The fully qualified name of a class to use as the implementation of the scheduler. Use `org.apache.hadoop.ipc.DecayRpcScheduler` in conjunction with the Fair Call Queue. | `org.apache.hadoop.ipc.DefaultRpcScheduler` (no-op scheduler) <br/> If using FairCallQueue, defaults to `org.apache.hadoop.ipc.DecayRpcScheduler` |
 | scheduler.priority.levels | RpcScheduler, CallQueue | How many priority levels to use within the scheduler and call queue. | 4 |
 | faircallqueue.multiplexer.weights | WeightedRoundRobinMultiplexer | How much weight to give to each priority queue. This should be a comma-separated list of length equal to the number of priority levels. | Weights descend by a factor of 2 (e.g., for 4 levels: `8,4,2,1`) |
@@ -152,6 +153,10 @@ processed.
          <value>org.apache.hadoop.ipc.FairCallQueue</value>
     </property>
     <property>
+         <name>ipc.8020.callqueue.capacity.weights</name>
+         <value>7,3</value>
+    </property>
+    <property>
          <name>ipc.8020.scheduler.impl</name>
          <value>org.apache.hadoop.ipc.DecayRpcScheduler</value>
     </property>
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
index e6a5f5e..f478957 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
@@ -104,6 +104,9 @@ public class TestFairCallQueue {
     assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
     fairCallQueue = new FairCallQueue<Schedulable>(7, 1025, "ns", conf);
     assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
+    fairCallQueue = new FairCallQueue<Schedulable>(7, 1025, "ns",
+        new int[]{7, 6, 5, 4, 3, 2, 1}, conf);
+    assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
   }
 
   @Test
@@ -157,6 +160,66 @@ public class TestFairCallQueue {
     assertNull(fcq.poll());
   }
 
+  @Test
+  public void testQueueCapacity() {
+    int numQueues = 2;
+    int capacity = 4;
+    Configuration conf = new Configuration();
+    List<Schedulable> calls = new ArrayList<>();
+
+    // default weights i.e. all queues share capacity
+    fcq = new FairCallQueue<Schedulable>(numQueues, 4, "ns", conf);
+    FairCallQueue<Schedulable> fcq1 = new FairCallQueue<Schedulable>(
+        numQueues, capacity, "ns", new int[]{3, 1}, conf);
+
+    for (int i=0; i < capacity; i++) {
+      Schedulable call = mockCall("u", i%2);
+      calls.add(call);
+      fcq.add(call);
+      fcq1.add(call);
+
+      call = mockCall("u", (i++)%2);
+      calls.add(call);
+      fcq.add(call);
+      fcq1.add(call);
+    }
+
+    final AtomicInteger currentIndex = new AtomicInteger();
+    fcq.setMultiplexer(new RpcMultiplexer(){
+      @Override
+      public int getAndAdvanceCurrentIndex() {
+        return currentIndex.get();
+      }
+    });
+    fcq1.setMultiplexer(new RpcMultiplexer(){
+      @Override
+      public int getAndAdvanceCurrentIndex() {
+        return currentIndex.get();
+      }
+    });
+
+    // either queue will have two calls
+    //    v
+    // 0  2
+    // 1  3
+    currentIndex.set(1);
+    assertSame(calls.get(2), fcq.poll());
+    assertSame(calls.get(3), fcq.poll());
+    assertSame(calls.get(0), fcq.poll());
+    assertSame(calls.get(1), fcq.poll());
+
+    // queues with different number of calls
+    //    v
+    // 0  3
+    // 1
+    // 2
+    currentIndex.set(1);
+    assertSame(calls.get(3), fcq1.poll());
+    assertSame(calls.get(0), fcq1.poll());
+    assertSame(calls.get(1), fcq1.poll());
+    assertSame(calls.get(2), fcq1.poll());
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testInsertionWithFailover() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org