You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/06/11 04:56:29 UTC

[43/50] hbase git commit: HBASE-15994 Allow selection of RpcSchedulers Adds logging by the RpcExecutors of their run configs Adds a FifoRpcSchedulerFactory so you can try Fifo scheduler.

HBASE-15994 Allow selection of RpcSchedulers
Adds logging by the RpcExecutors of their run configs
Adds a FifoRpcSchedulerFactory so you can try Fifo scheduler.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/031b7450
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/031b7450
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/031b7450

Branch: refs/heads/hbase-12439
Commit: 031b745001c7d54ef13f3cd6d725d0eb78095785
Parents: 407aa4d
Author: stack <st...@apache.org>
Authored: Wed Jun 8 20:23:11 2016 -0700
Committer: stack <st...@apache.org>
Committed: Wed Jun 8 20:23:11 2016 -0700

----------------------------------------------------------------------
 .../hbase/ipc/BalancedQueueRpcExecutor.java     |  4 ++
 .../hadoop/hbase/ipc/FifoRpcScheduler.java      |  5 ++
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    | 54 ++++++++-------
 .../regionserver/FifoRpcSchedulerFactory.java   | 47 +++++++++++++
 .../hbase/regionserver/RpcSchedulerFactory.java |  4 +-
 .../regionserver/SimpleRpcSchedulerFactory.java |  6 +-
 .../regionserver/TestRpcSchedulerFactory.java   | 71 ++++++++++++++++++++
 7 files changed, 161 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/031b7450/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index e4205eb..3505221 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -36,6 +38,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
 @InterfaceStability.Evolving
 public class BalancedQueueRpcExecutor extends RpcExecutor {
+  private static final Log LOG = LogFactory.getLog(BalancedQueueRpcExecutor.class);
 
   protected final List<BlockingQueue<CallRunner>> queues;
   private final QueueBalancer balancer;
@@ -62,6 +65,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
     queues = new ArrayList<BlockingQueue<CallRunner>>(numQueues);
     this.balancer = getBalancer(numQueues);
     initializeQueues(numQueues, queueClass, initargs);
+    LOG.debug(name + " queues=" + numQueues + " handlerCount=" + handlerCount);
   }
 
   protected void initializeQueues(final int numQueues,

http://git-wip-us.apache.org/repos/asf/hbase/blob/031b7450/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index ee36f3f..70d903a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DaemonThreadFactory;
 
@@ -32,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * This can be used for HMaster, where no prioritization is needed.
  */
 public class FifoRpcScheduler extends RpcScheduler {
+  private static final Log LOG = LogFactory.getLog(FifoRpcScheduler.class);
   private final int handlerCount;
   private final int maxQueueLength;
   private final AtomicInteger queueSize = new AtomicInteger(0);
@@ -41,6 +44,8 @@ public class FifoRpcScheduler extends RpcScheduler {
     this.handlerCount = handlerCount;
     this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
         handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+    LOG.info("Using " + this.getClass().getSimpleName() + " as user call queue; handlerCount=" +
+        handlerCount + "; maxQueueLength=" + maxQueueLength);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/031b7450/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 431aeeb..d9d61c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -34,8 +34,11 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
 
 /**
- * A scheduler that maintains isolated handler pools for general,
- * high-priority, and replication requests.
+ * The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
+ * high-priority ('priority'), and replication ('replication') requests. Default behavior is to
+ * balance the requests across handlers. Add configs to enable balancing by read vs writes, etc.
+ * See below article for explanation of options.
+ * @see <a href="http://blog.cloudera.com/blog/2014/12/new-in-cdh-5-2-improvements-for-running-multiple-workloads-on-a-single-hbase-cluster/">Overview on Request Queuing</a>
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
 @InterfaceStability.Evolving
@@ -49,7 +52,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
   public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
       "hbase.ipc.server.callqueue.handler.factor";
 
-  /** If set to 'deadline', uses a priority queue and deprioritize long-running scans */
+  /** If set to 'deadline', the default, uses a priority queue and deprioritizes long-running scans
+   */
   public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
   public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
   public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
@@ -190,54 +194,58 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
 
     float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
     int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
-
-    LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);
-
+    LOG.info("Using " + callQueueType + " as user call queue; numCallQueues=" + numCallQueues +
+        "; callQReadShare=" + callqReadShare + ", callQScanShare=" + callqScanShare);
     if (numCallQueues > 1 && callqReadShare > 0) {
       // multiple read/write queues
-      if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+      if (isDeadlineQueueType(callQueueType)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
-        callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
+        callExecutor = new RWQueueRpcExecutor("RWQ.default", handlerCount, numCallQueues,
             callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
             BoundedPriorityBlockingQueue.class, callPriority);
       } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
         Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
           codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
-        callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount,
+        callExecutor = new RWQueueRpcExecutor("RWQ.default", handlerCount,
           numCallQueues, callqReadShare, callqScanShare,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
       } else {
-        callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
+        callExecutor = new RWQueueRpcExecutor("RWQ.default", handlerCount, numCallQueues,
           callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
       }
     } else {
       // multiple queues
-      if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+      if (isDeadlineQueueType(callQueueType)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
-        callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
-          conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
+        callExecutor =
+          new BalancedQueueRpcExecutor("BalancedQ.default", handlerCount, numCallQueues,
+            conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
       } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
-        callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues,
-          conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
-          codelTargetDelay, codelInterval, codelLifoThreshold,
-          numGeneralCallsDropped, numLifoModeSwitches);
+        callExecutor =
+          new BalancedQueueRpcExecutor("BalancedQ.default", handlerCount, numCallQueues,
+            conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
+            codelTargetDelay, codelInterval, codelLifoThreshold,
+            numGeneralCallsDropped, numLifoModeSwitches);
       } else {
-        callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount,
+        callExecutor = new BalancedQueueRpcExecutor("BalancedQ.default", handlerCount,
             numCallQueues, maxQueueLength, conf, abortable);
       }
     }
-
     // Create 2 queues to help priorityExecutor be more scalable.
     this.priorityExecutor = priorityHandlerCount > 0 ?
-        new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxPriorityQueueLength) :
-        null;
-
+      new BalancedQueueRpcExecutor("BalancedQ.priority", priorityHandlerCount, 2,
+          maxPriorityQueueLength):
+      null;
    this.replicationExecutor =
-     replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
+     replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("BalancedQ.replication",
        replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
   }
 
+  private static boolean isDeadlineQueueType(final String callQueueType) {
+    return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
+  }
+
   public SimpleRpcScheduler(
 	      Configuration conf,
 	      int handlerCount,

http://git-wip-us.apache.org/repos/asf/hbase/blob/031b7450/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FifoRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FifoRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FifoRpcSchedulerFactory.java
new file mode 100644
index 0000000..f4b51ba
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FifoRpcSchedulerFactory.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+
+/**
+ * Factory to use when you want to use the {@link FifoRpcScheduler}
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FifoRpcSchedulerFactory implements RpcSchedulerFactory {
+  @Override
+  public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
+    int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+      HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+    return new FifoRpcScheduler(conf, handlerCount);
+  }
+
+  @Deprecated
+  @Override
+  public RpcScheduler create(Configuration conf, PriorityFunction priority) {
+    return create(conf, priority, null);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/031b7450/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
index f554781..7bc59da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.ipc.RpcScheduler;
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
 @InterfaceStability.Evolving
 public interface RpcSchedulerFactory {
-
   /**
    * Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}.
    */
@@ -39,5 +38,4 @@ public interface RpcSchedulerFactory {
 
   @Deprecated
   RpcScheduler create(Configuration conf, PriorityFunction priority);
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/031b7450/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
index 743c5bb..92462c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
@@ -27,11 +27,11 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.ipc.RpcScheduler;
 import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
 
-/** Constructs a {@link SimpleRpcScheduler}. */
+/** Constructs a {@link SimpleRpcScheduler}.
+ */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
 @InterfaceStability.Evolving
 public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
-
   @Override
   @Deprecated
   public RpcScheduler create(Configuration conf, PriorityFunction priority) {
@@ -42,7 +42,6 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
   public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
     int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
         HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
-
     return new SimpleRpcScheduler(
       conf,
       handlerCount,
@@ -54,5 +53,4 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
       server,
       HConstants.QOS_THRESHOLD);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/031b7450/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRpcSchedulerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRpcSchedulerFactory.java
new file mode 100644
index 0000000..9366c54
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRpcSchedulerFactory.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+
+/**
+ * A silly test that does nothing but make sure an rpcscheduler factory makes what it says
+ * it is going to make.
+ */
+@Category(SmallTests.class)
+public class TestRpcSchedulerFactory {
+  @Rule public TestName testName = new TestName();
+  @ClassRule public static TestRule timeout =
+      CategoryBasedTimeout.forClass(TestRpcSchedulerFactory.class);
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws Exception {
+    this.conf = HBaseConfiguration.create();
+  }
+
+  @Test
+  public void testRWQ() {
+    // Set some configs just to see how it changes the scheduler. Can't assert the settings had
+    // an effect. Just eyeball the log.
+    this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5);
+    this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5);
+    this.conf.setDouble(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5);
+    RpcSchedulerFactory factory = new SimpleRpcSchedulerFactory();
+    RpcScheduler rpcScheduler = factory.create(this.conf, null, null);
+    assertTrue(rpcScheduler.getClass().equals(SimpleRpcScheduler.class));
+  }
+
+  @Test
+  public void testFifo() {
+    RpcSchedulerFactory factory = new FifoRpcSchedulerFactory();
+    RpcScheduler rpcScheduler = factory.create(this.conf, null, null);
+    assertTrue(rpcScheduler.getClass().equals(FifoRpcScheduler.class));
+  }
+}
\ No newline at end of file