You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2020/04/28 23:15:08 UTC
[hadoop] branch trunk updated: HADOOP-17010. Add queue capacity
support for FairCallQueue (#1977)
This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4202750 HADOOP-17010. Add queue capacity support for FairCallQueue (#1977)
4202750 is described below
commit 4202750040f91f8dcc218ecc7d3ccf81a8e68b2a
Author: lfengnan <lo...@gmail.com>
AuthorDate: Tue Apr 28 16:14:55 2020 -0700
HADOOP-17010. Add queue capacity support for FairCallQueue (#1977)
---
.../apache/hadoop/fs/CommonConfigurationKeys.java | 3 ++
.../org/apache/hadoop/ipc/CallQueueManager.java | 56 +++++++++++++++++--
.../java/org/apache/hadoop/ipc/FairCallQueue.java | 25 +++++++--
.../src/site/markdown/FairCallQueue.md | 5 ++
.../org/apache/hadoop/ipc/TestFairCallQueue.java | 63 ++++++++++++++++++++++
5 files changed, 143 insertions(+), 9 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 40ddfba..c08af39 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -114,6 +114,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
"callqueue.overflow.trigger.failover";
public static final boolean IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT =
false;
+ /** Callqueue subqueue capacity weights. */
+ public static final String IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY =
+ "callqueue.capacity.weights";
/**
* IPC scheduler priority levels.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 81b7d34..53ac34b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.AbstractQueue;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
@@ -77,8 +78,10 @@ public class CallQueueManager<E extends Schedulable>
int priorityLevels = parseNumLevels(namespace, conf);
this.scheduler = createScheduler(schedulerClass, priorityLevels,
namespace, conf);
+ int[] capacityWeights = parseCapacityWeights(priorityLevels,
+ namespace, conf);
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
- priorityLevels, maxQueueSize, namespace, conf);
+ priorityLevels, maxQueueSize, namespace, capacityWeights, conf);
this.clientBackOffEnabled = clientBackOffEnabled;
this.serverFailOverEnabled = conf.getBoolean(
namespace + "." +
@@ -146,13 +149,14 @@ public class CallQueueManager<E extends Schedulable>
private <T extends BlockingQueue<E>> T createCallQueueInstance(
Class<T> theClass, int priorityLevels, int maxLen, String ns,
- Configuration conf) {
+ int[] capacityWeights, Configuration conf) {
// Used for custom, configurable callqueues
try {
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
- int.class, String.class, Configuration.class);
- return ctor.newInstance(priorityLevels, maxLen, ns, conf);
+ int.class, String.class, int[].class, Configuration.class);
+ return ctor.newInstance(priorityLevels, maxLen, ns,
+ capacityWeights, conf);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
@@ -344,6 +348,47 @@ public class CallQueueManager<E extends Schedulable>
}
/**
+ * Read the weights of capacity in callqueue and pass the value to
+ * callqueue constructions.
+ */
+ private static int[] parseCapacityWeights(
+ int priorityLevels, String ns, Configuration conf) {
+ int[] weights = conf.getInts(ns + "." +
+ CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY);
+ if (weights.length == 0) {
+ weights = getDefaultQueueCapacityWeights(priorityLevels);
+ } else if (weights.length != priorityLevels) {
+ throw new IllegalArgumentException(
+ CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY + " must "
+ + "specify " + priorityLevels + " capacity weights: one for each "
+ + "priority level");
+ } else {
+ // only allow positive numbers
+ for (int w : weights) {
+ if (w <= 0) {
+ throw new IllegalArgumentException(
+ CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY +
+ " only takes positive weights. " + w + " capacity weight " +
+ "found");
+ }
+ }
+ }
+ return weights;
+ }
+
+ /**
+ * By default, queue capacity is the same for all priority levels.
+ *
+ * @param priorityLevels number of levels
+ * @return default weights
+ */
+ public static int[] getDefaultQueueCapacityWeights(int priorityLevels) {
+ int[] weights = new int[priorityLevels];
+ Arrays.fill(weights, 1);
+ return weights;
+ }
+
+ /**
* Replaces active queue with the newly requested one and transfers
* all calls to the newQ before returning.
*/
@@ -355,8 +400,9 @@ public class CallQueueManager<E extends Schedulable>
this.scheduler.stop();
RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
ns, conf);
+ int[] capacityWeights = parseCapacityWeights(priorityLevels, ns, conf);
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
- priorityLevels, maxSize, ns, conf);
+ priorityLevels, maxSize, ns, capacityWeights, conf);
// Our current queue becomes the old queue
BlockingQueue<E> oldQ = putRef.get();
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
index d15a710..939149f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
@@ -80,17 +80,27 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
/* Failover if queue is filled up */
private boolean serverFailOverEnabled;
+
+ @VisibleForTesting
+ public FairCallQueue(int priorityLevels, int capacity, String ns,
+ Configuration conf) {
+ this(priorityLevels, capacity, ns,
+ CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels), conf);
+ }
+
/**
* Create a FairCallQueue.
* @param capacity the total size of all sub-queues
* @param ns the prefix to use for configuration
+ * @param capacityWeights the weights array for capacity allocation
+ * among subqueues
* @param conf the configuration to read from
* Notes: Each sub-queue has a capacity of `capacity / numSubqueues`.
* The first or the highest priority sub-queue has an excess capacity
* of `capacity % numSubqueues`
*/
public FairCallQueue(int priorityLevels, int capacity, String ns,
- Configuration conf) {
+ int[] capacityWeights, Configuration conf) {
if(priorityLevels < 1) {
throw new IllegalArgumentException("Number of Priority Levels must be " +
"at least 1");
@@ -101,11 +111,18 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
this.overflowedCalls = new ArrayList<AtomicLong>(numQueues);
- int queueCapacity = capacity / numQueues;
- int capacityForFirstQueue = queueCapacity + (capacity % numQueues);
+ int totalWeights = 0;
+ for (int i = 0; i < capacityWeights.length; i++) {
+ totalWeights += capacityWeights[i];
+ }
+ int residueCapacity = capacity % totalWeights;
+ int unitCapacity = capacity / totalWeights;
+ int queueCapacity;
for(int i=0; i < numQueues; i++) {
+ queueCapacity = unitCapacity * capacityWeights[i];
if (i == 0) {
- this.queues.add(new LinkedBlockingQueue<E>(capacityForFirstQueue));
+ this.queues.add(new LinkedBlockingQueue<E>(
+ queueCapacity + residueCapacity));
} else {
this.queues.add(new LinkedBlockingQueue<E>(queueCapacity));
}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md b/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md
index 22ac05a..887d305 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md
@@ -126,6 +126,7 @@ omitted.
|:---- |:---- |:---- |:--- |
| backoff.enable | General | Whether or not to enable client backoff when a queue is full. | false |
| callqueue.impl | General | The fully qualified name of a class to use as the implementation of a call queue. Use `org.apache.hadoop.ipc.FairCallQueue` for the Fair Call Queue. | `java.util.concurrent.LinkedBlockingQueue` (FIFO queue) |
+| callqueue.capacity.weights | General | The capacity allocation weights among all subqueues. A postive int array whose length is equal to the `scheduler.priority.levels` is expected where each int is the relative weight out of total capacity. i.e. if a queue with capacity weight `w`, its queue capacity is `capacity * w/sum(weights)` |
| scheduler.impl | General | The fully qualified name of a class to use as the implementation of the scheduler. Use `org.apache.hadoop.ipc.DecayRpcScheduler` in conjunction with the Fair Call Queue. | `org.apache.hadoop.ipc.DefaultRpcScheduler` (no-op scheduler) <br/> If using FairCallQueue, defaults to `org.apache.hadoop.ipc.DecayRpcScheduler` |
| scheduler.priority.levels | RpcScheduler, CallQueue | How many priority levels to use within the scheduler and call queue. | 4 |
| faircallqueue.multiplexer.weights | WeightedRoundRobinMultiplexer | How much weight to give to each priority queue. This should be a comma-separated list of length equal to the number of priority levels. | Weights descend by a factor of 2 (e.g., for 4 levels: `8,4,2,1`) |
@@ -152,6 +153,10 @@ processed.
<value>org.apache.hadoop.ipc.FairCallQueue</value>
</property>
<property>
+ <name>ipc.8020.callqueue.capacity.weights</name>
+ <value>7,3</value>
+ </property>
+ <property>
<name>ipc.8020.scheduler.impl</name>
<value>org.apache.hadoop.ipc.DecayRpcScheduler</value>
</property>
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
index e6a5f5e..f478957 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
@@ -104,6 +104,9 @@ public class TestFairCallQueue {
assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
fairCallQueue = new FairCallQueue<Schedulable>(7, 1025, "ns", conf);
assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
+ fairCallQueue = new FairCallQueue<Schedulable>(7, 1025, "ns",
+ new int[]{7, 6, 5, 4, 3, 2, 1}, conf);
+ assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
}
@Test
@@ -157,6 +160,66 @@ public class TestFairCallQueue {
assertNull(fcq.poll());
}
+ @Test
+ public void testQueueCapacity() {
+ int numQueues = 2;
+ int capacity = 4;
+ Configuration conf = new Configuration();
+ List<Schedulable> calls = new ArrayList<>();
+
+ // default weights i.e. all queues share capacity
+ fcq = new FairCallQueue<Schedulable>(numQueues, 4, "ns", conf);
+ FairCallQueue<Schedulable> fcq1 = new FairCallQueue<Schedulable>(
+ numQueues, capacity, "ns", new int[]{3, 1}, conf);
+
+ for (int i=0; i < capacity; i++) {
+ Schedulable call = mockCall("u", i%2);
+ calls.add(call);
+ fcq.add(call);
+ fcq1.add(call);
+
+ call = mockCall("u", (i++)%2);
+ calls.add(call);
+ fcq.add(call);
+ fcq1.add(call);
+ }
+
+ final AtomicInteger currentIndex = new AtomicInteger();
+ fcq.setMultiplexer(new RpcMultiplexer(){
+ @Override
+ public int getAndAdvanceCurrentIndex() {
+ return currentIndex.get();
+ }
+ });
+ fcq1.setMultiplexer(new RpcMultiplexer(){
+ @Override
+ public int getAndAdvanceCurrentIndex() {
+ return currentIndex.get();
+ }
+ });
+
+ // either queue will have two calls
+ // v
+ // 0 2
+ // 1 3
+ currentIndex.set(1);
+ assertSame(calls.get(2), fcq.poll());
+ assertSame(calls.get(3), fcq.poll());
+ assertSame(calls.get(0), fcq.poll());
+ assertSame(calls.get(1), fcq.poll());
+
+ // queues with different number of calls
+ // v
+ // 0 3
+ // 1
+ // 2
+ currentIndex.set(1);
+ assertSame(calls.get(3), fcq1.poll());
+ assertSame(calls.get(0), fcq1.poll());
+ assertSame(calls.get(1), fcq1.poll());
+ assertSame(calls.get(2), fcq1.poll());
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testInsertionWithFailover() {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org