You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2014/07/15 00:09:24 UTC

[2/3] git commit: HBASE-11513 Combine SingleMultiple Queue RpcExecutor into a single class (Jesse Yates)

HBASE-11513 Combine SingleMultiple Queue RpcExecutor into a single class (Jesse Yates)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4824b0de
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4824b0de
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4824b0de

Branch: refs/heads/master
Commit: 4824b0dea721790f7ff14af317620d26d931a18e
Parents: c61676a
Author: Andrew Purtell <ap...@apache.org>
Authored: Mon Jul 14 14:39:59 2014 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Jul 14 15:05:06 2014 -0700

----------------------------------------------------------------------
 .../hbase/ipc/BalancedQueueRpcExecutor.java     | 127 +++++++++++++++++++
 .../hbase/ipc/MultipleQueueRpcExecutor.java     |  82 ------------
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  27 ++--
 .../hbase/ipc/SingleQueueRpcExecutor.java       |  66 ----------
 4 files changed, 136 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4824b0de/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
new file mode 100644
index 0000000..7cf2101
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains
+ * efficient with a single queue via an inlinable queue balancing mechanism.
+ */
+@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
+@InterfaceStability.Evolving
+public class BalancedQueueRpcExecutor extends RpcExecutor {
+
+  protected final List<BlockingQueue<CallRunner>> queues;
+  private QueueBalancer balancer;
+
+  public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final int maxQueueLength) {
+    this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
+  }
+
+  public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+      final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+    super(name, Math.max(handlerCount, numQueues));
+    queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
+    this.balancer = getBalancer(numQueues);
+    initializeQueues(numQueues, queueClass, initargs);
+  }
+
+  protected void initializeQueues(final int numQueues,
+      final Class<? extends BlockingQueue> queueClass, Object... initargs) {
+    for (int i = 0; i < numQueues; ++i) {
+      queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
+    }
+  }
+
+  @Override
+  public void dispatch(final CallRunner callTask) throws InterruptedException {
+    int queueIndex = balancer.getNextQueue();
+    queues.get(queueIndex).put(callTask);
+  }
+
+  @Override
+  public int getQueueLength() {
+    int length = 0;
+    for (final BlockingQueue<CallRunner> queue : queues) {
+      length += queue.size();
+    }
+    return length;
+  }
+
+  @Override
+  public List<BlockingQueue<CallRunner>> getQueues() {
+    return queues;
+  }
+
+  private static abstract class QueueBalancer {
+    /**
+     * @return the index of the next queue to which a request should be inserted
+     */
+    public abstract int getNextQueue();
+  }
+
+  public static QueueBalancer getBalancer(int queueSize) {
+    Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
+    if (queueSize == 1) {
+      return ONE_QUEUE;
+    } else {
+      return new RandomQueueBalancer(queueSize);
+    }
+  }
+
+  /**
+   * All requests go to the first queue, at index 0
+   */
+  private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
+
+    @Override
+    public int getNextQueue() {
+      return 0;
+    }
+  };
+
+  /**
+   * Queue balancer that just randomly selects a queue in the range [0, num queues).
+   */
+  private static class RandomQueueBalancer extends QueueBalancer {
+    private int queueSize;
+    private Random random;
+
+    public RandomQueueBalancer(int queueSize) {
+      this.queueSize = queueSize;
+      this.random = new Random();
+    }
+
+    public int getNextQueue() {
+      return random.nextInt(queueSize);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4824b0de/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
deleted file mode 100644
index 71ddfa6..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MultipleQueueRpcExecutor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-
-/**
- * RPC Executor that dispatch the requests on multiple queues.
- * Each handler has its own queue and there is no stealing.
- */
-@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
-@InterfaceStability.Evolving
-public class MultipleQueueRpcExecutor extends RpcExecutor {
-  protected final List<BlockingQueue<CallRunner>> queues;
-  protected final Random balancer = new Random();
-
-  public MultipleQueueRpcExecutor(final String name, final int handlerCount,
-      final int numQueues, final int maxQueueLength) {
-    this(name, handlerCount, numQueues, LinkedBlockingQueue.class, maxQueueLength);
-  }
-
-  public MultipleQueueRpcExecutor(final String name, final int handlerCount,
-      final int numQueues,
-      final Class<? extends BlockingQueue> queueClass, Object... initargs) {
-    super(name, Math.max(handlerCount, numQueues));
-    queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
-    initializeQueues(numQueues, queueClass, initargs);
-  }
-
-  protected void initializeQueues(final int numQueues,
-      final Class<? extends BlockingQueue> queueClass, Object... initargs) {
-    for (int i = 0; i < numQueues; ++i) {
-      queues.add((BlockingQueue<CallRunner>)
-        ReflectionUtils.newInstance(queueClass, initargs));
-    }
-  }
-
-  @Override
-  public void dispatch(final CallRunner callTask) throws InterruptedException {
-    int queueIndex = balancer.nextInt(queues.size());
-    queues.get(queueIndex).put(callTask);
-  }
-
-  @Override
-  public int getQueueLength() {
-    int length = 0;
-    for (final BlockingQueue<CallRunner> queue: queues) {
-      length += queue.size();
-    }
-    return length;
-  }
-
-  @Override
-  protected List<BlockingQueue<CallRunner>> getQueues() {
-    return queues;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4824b0de/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 4b46595..953bc36 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -127,33 +127,24 @@ public class SimpleRpcScheduler extends RpcScheduler {
         callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
             callqReadShare, maxQueueLength);
       }
-    } else if (numCallQueues > 1) {
+    } else {
       // multiple queues
       if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
-        callExecutor = new MultipleQueueRpcExecutor("default", handlerCount, numCallQueues,
+        callExecutor = new BalancedQueueRpcExecutor("default", handlerCount, numCallQueues,
             BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
       } else {
-        callExecutor = new MultipleQueueRpcExecutor("default", handlerCount,
+        callExecutor = new BalancedQueueRpcExecutor("default", handlerCount,
             numCallQueues, maxQueueLength);
       }
-    } else {
-      // Single queue
-      if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
-        CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
-        callExecutor = new SingleQueueRpcExecutor("default", handlerCount,
-            BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
-      } else {
-        callExecutor = new SingleQueueRpcExecutor("default", handlerCount, maxQueueLength);
-      }
     }
 
-    this.priorityExecutor = priorityHandlerCount > 0
-        ? new SingleQueueRpcExecutor("Priority", priorityHandlerCount, maxQueueLength)
-        : null;
-    this.replicationExecutor = replicationHandlerCount > 0
-        ? new SingleQueueRpcExecutor("Replication", replicationHandlerCount, maxQueueLength)
-        : null;
+   this.priorityExecutor =
+     priorityHandlerCount > 0 ? new BalancedQueueRpcExecutor("Priority", priorityHandlerCount,
+       1, maxQueueLength) : null;
+   this.replicationExecutor =
+     replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
+       replicationHandlerCount, 1, maxQueueLength) : null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/4824b0de/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
deleted file mode 100644
index b94b14b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SingleQueueRpcExecutor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * RPC Executor that uses a single queue for all the requests.
- */
-@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
-@InterfaceStability.Evolving
-public class SingleQueueRpcExecutor extends RpcExecutor {
-  private final BlockingQueue<CallRunner> queue;
-
-  public SingleQueueRpcExecutor(final String name, final int handlerCount,
-      final int maxQueueLength) {
-    this(name, handlerCount, LinkedBlockingQueue.class, maxQueueLength);
-  }
-
-  public SingleQueueRpcExecutor(final String name, final int handlerCount,
-      final Class<? extends BlockingQueue> queueClass, Object... initargs) {
-    super(name, handlerCount);
-    queue = (BlockingQueue<CallRunner>)ReflectionUtils.newInstance(queueClass, initargs);
-  }
-
-  @Override
-  public void dispatch(final CallRunner callTask) throws InterruptedException {
-    queue.put(callTask);
-  }
-
-  @Override
-  public int getQueueLength() {
-    return queue.size();
-  }
-
-  @Override
-  protected List<BlockingQueue<CallRunner>> getQueues() {
-    List<BlockingQueue<CallRunner>> list = new ArrayList<BlockingQueue<CallRunner>>(1);
-    list.add(queue);
-    return list;
-  }
-}