You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/06/15 00:41:26 UTC

hbase git commit: HBASE-16023 Fastpath for the FIFO rpcscheduler Adds an executor that does balanced queue and fast path handing off requests directly to waiting handlers if any present. Idea taken from Apace Kudu (incubating). See https://gerrit.clouder

Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 06762cd4d -> 8c46caacc


HBASE-16023 Fastpath for the FIFO rpcscheduler Adds an executor that does balanced queue and fast path handing off requests directly to waiting handlers if any present. Idea taken from Apace Kudu (incubating). See https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h

M hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
 Refactor which makes a Handler type. Put all 'handler' stuff inside this
 new type. Also make it so subclass can provide its own Handler type.

M hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 Name the handler threads for their type so can tell if configs are
 having an effect.

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8c46caac
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8c46caac
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8c46caac

Branch: refs/heads/branch-1.3
Commit: 8c46caacc93b73ba040155e47b131a353d57550c
Parents: 06762cd
Author: stack <st...@apache.org>
Authored: Tue Jun 14 11:18:34 2016 -0700
Committer: stack <st...@apache.org>
Committed: Tue Jun 14 17:41:15 2016 -0700

----------------------------------------------------------------------
 .../hbase/ipc/BalancedQueueRpcExecutor.java     |   5 +-
 ...ifoWithFastPathBalancedQueueRpcExecutor.java | 116 ++++++++++++++
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    | 160 ++++++++++++-------
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  47 +++---
 .../hbase/ipc/TestSimpleRpcScheduler.java       |   4 +-
 5 files changed, 247 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8c46caac/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index e4205eb..a31c8b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
  * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still remains
- * efficient with a single queue via an inlinable queue balancing mechanism.
+ * efficient with a single queue via an inlinable queue balancing mechanism. Defaults to FIFO but
+ * you can pass an alternate queue class to use.
  */
 @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
 @InterfaceStability.Evolving
@@ -99,4 +100,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
   public List<BlockingQueue<CallRunner>> getQueues() {
     return queues;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8c46caac/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
new file mode 100644
index 0000000..1951dd0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Deque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * FIFO balanced queue executor with a fastpath. Because this is FIFO, it has no respect for
+ * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible.
+ * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling
+ * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See
+ * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h
+ */
+@InterfaceAudience.Private
+public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
+  // Depends on default behavior of BalancedQueueRpcExecutor being FIFO!
+
+  /*
+   * Stack of Handlers waiting for work.
+   */
+  private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
+
+  public FifoWithFastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
+      final int numQueues, final int maxQueueLength, final Configuration conf,
+      final Abortable abortable) {
+    super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
+        maxQueueLength);
+  }
+
+  @Override
+  protected Handler getHandler(String name, double handlerFailureThreshhold,
+      BlockingQueue<CallRunner> q) {
+    return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
+  }
+
+  @Override
+  public boolean dispatch(CallRunner callTask) throws InterruptedException {
+    FastPathHandler handler = popReadyHandler();
+    return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
+  }
+
+  /**
+   * @return Pop a Handler instance if one available ready-to-go or else return null.
+   */
+  private FastPathHandler popReadyHandler() {
+    return this.fastPathHandlerStack.poll();
+  }
+
+  class FastPathHandler extends Handler {
+    // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
+    // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
+    final Deque<FastPathHandler> fastPathHandlerStack;
+    // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
+    private Semaphore semaphore = new Semaphore(1);
+    // The task we get when fast-pathing.
+    private CallRunner loadedCallRunner;
+
+    FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
+        final Deque<FastPathHandler> fastPathHandlerStack) {
+      super(name, handlerFailureThreshhold, q);
+      this.fastPathHandlerStack = fastPathHandlerStack;
+      this.semaphore.drainPermits();
+    }
+
+    protected CallRunner getCallRunner() throws InterruptedException {
+      // Get a callrunner if one in the Q.
+      CallRunner cr = this.q.poll();
+      if (cr == null) {
+        // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
+        // the fastpath handoff done via fastPathHandlerStack.
+        if (this.fastPathHandlerStack != null) {
+          this.fastPathHandlerStack.push(this);
+          this.semaphore.acquire();
+          cr = this.loadedCallRunner;
+        } else {
+          // No fastpath available. Block until a task comes available.
+          cr = super.getCallRunner();
+        }
+      }
+      return cr;
+    }
+
+    /**
+     * @param task Task gotten via fastpath.
+     * @return True if we successfully loaded our task
+     */
+    boolean loadCallRunner(final CallRunner cr) {
+      this.loadedCallRunner = cr;
+      this.semaphore.release();
+      return true;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8c46caac/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 40c11aa..9e0241c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -30,15 +31,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
+/**
+ * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
+ * scheduling behavior.
+ */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
 public abstract class RpcExecutor {
   private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
 
@@ -47,7 +50,7 @@ public abstract class RpcExecutor {
   protected volatile int currentQueueLimit;
 
   private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
-  private final List<Thread> handlers;
+  private final List<Handler> handlers;
   private final int handlerCount;
   private final String name;
   private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
@@ -58,7 +61,7 @@ public abstract class RpcExecutor {
   private Abortable abortable = null;
 
   public RpcExecutor(final String name, final int handlerCount) {
-    this.handlers = new ArrayList<Thread>(handlerCount);
+    this.handlers = new ArrayList<Handler>(handlerCount);
     this.handlerCount = handlerCount;
     this.name = Strings.nullToEmpty(name);
   }
@@ -100,75 +103,111 @@ public abstract class RpcExecutor {
     startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
   }
 
+  /**
+   * Override if providing alternate Handler implementation.
+   */
+  protected Handler getHandler(final String name, final double handlerFailureThreshhold,
+      final BlockingQueue<CallRunner> q) {
+    return new Handler(name, handlerFailureThreshhold, q);
+  }
+
+  /**
+   * Start up our handlers.
+   */
   protected void startHandlers(final String nameSuffix, final int numHandlers,
       final List<BlockingQueue<CallRunner>> callQueues,
       final int qindex, final int qsize, final int port) {
     final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
+    double handlerFailureThreshhold =
+        conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
+          HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
     for (int i = 0; i < numHandlers; i++) {
       final int index = qindex + (i % qsize);
-      Thread t = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          consumerLoop(callQueues.get(index));
-        }
-      });
-      t.setDaemon(true);
-      t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
-        ",queue=" + index + ",port=" + port);
-      t.start();
-      LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
-      handlers.add(t);
+      String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" +
+          index + ",port=" + port;
+      Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index));
+      handler.start();
+      LOG.debug("Started " + name);
+      handlers.add(handler);
     }
   }
 
-  protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
-    boolean interrupted = false;
-    double handlerFailureThreshhold =
-        conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
-          HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
-    try {
-      while (running) {
-        try {
-          MonitoredRPCHandler status = RpcServer.getStatus();
-          CallRunner task = myQueue.take();
-          task.setStatus(status);
+  /**
+   * Handler thread run the {@link CallRunner#run()} in.
+   */
+  protected class Handler extends Thread {
+    /**
+     * Q to find CallRunners to run in.
+     */
+    final BlockingQueue<CallRunner> q;
+
+    final double handlerFailureThreshhold;
+
+    Handler(final String name, final double handlerFailureThreshhold,
+        final BlockingQueue<CallRunner> q) {
+      super(name);
+      setDaemon(true);
+      this.q = q;
+      this.handlerFailureThreshhold = handlerFailureThreshhold;
+    }
+
+    /**
+     * @return A {@link CallRunner}
+     * @throws InterruptedException
+     */
+    protected CallRunner getCallRunner() throws InterruptedException {
+      return this.q.take();
+    }
+
+    @Override
+    public void run() {
+      boolean interrupted = false;
+      try {
+        while (running) {
           try {
-            activeHandlerCount.incrementAndGet();
-            task.run();
-          } catch (Throwable e) {
-            if (e instanceof Error) {
-              int failedCount = failedHandlerCount.incrementAndGet();
-              if (handlerFailureThreshhold >= 0
-                  && failedCount > handlerCount * handlerFailureThreshhold) {
-                String message =
-                    "Number of failed RpcServer handler exceeded threshhold "
-                        + handlerFailureThreshhold + "  with failed reason: "
-                        + StringUtils.stringifyException(e);
-                if (abortable != null) {
-                  abortable.abort(message, e);
-                } else {
-                  LOG.error("Received " + StringUtils.stringifyException(e)
-                    + " but not aborting due to abortable being null");
-                  throw e;
-                }
-              } else {
-                LOG.warn("RpcServer handler threads encountered errors "
-                    + StringUtils.stringifyException(e));
-              }
+            run(getCallRunner());
+          } catch (InterruptedException e) {
+            interrupted = true;
+          }
+        }
+      } catch (Exception e) {
+        LOG.warn(e);
+        throw e;
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+    private void run(CallRunner cr) {
+      MonitoredRPCHandler status = RpcServer.getStatus();
+      cr.setStatus(status);
+      try {
+        activeHandlerCount.incrementAndGet();
+        cr.run();
+      } catch (Throwable e) {
+        if (e instanceof Error) {
+          int failedCount = failedHandlerCount.incrementAndGet();
+          if (this.handlerFailureThreshhold >= 0 &&
+              failedCount > handlerCount * this.handlerFailureThreshhold) {
+            String message = "Number of failed RpcServer handler runs exceeded threshhold " +
+              this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
+            if (abortable != null) {
+              abortable.abort(message, e);
             } else {
-              LOG.warn("RpcServer handler threads encountered exceptions "
-                  + StringUtils.stringifyException(e));
+              LOG.error("Error but can't abort because abortable is null: " +
+                  StringUtils.stringifyException(e));
+              throw e;
             }
-          } finally {
-            activeHandlerCount.decrementAndGet();
+          } else {
+            LOG.warn("Handler errors " + StringUtils.stringifyException(e));
           }
-        } catch (InterruptedException e) {
-          interrupted = true;
+        } else {
+          LOG.warn("Handler  exception " + StringUtils.stringifyException(e));
         }
-      }
-    } finally {
-      if (interrupted) {
-        Thread.currentThread().interrupt();
+      } finally {
+        activeHandlerCount.decrementAndGet();
       }
     }
   }
@@ -193,7 +232,6 @@ public abstract class RpcExecutor {
    * All requests go to the first queue, at index 0
    */
   private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
-
     @Override
     public int getNextQueue() {
       return 0;
@@ -226,4 +264,4 @@ public abstract class RpcExecutor {
     }
     currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8c46caac/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 19f36ab..7337b92 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -96,7 +96,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
     String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
       CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
 
-    if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+    if (isCodelQueueType(callQueueType)) {
       // update CoDel Scheduler tunables
       int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
         CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
@@ -200,49 +200,58 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
 
     if (numCallQueues > 1 && callqReadShare > 0) {
       // multiple read/write queues
-      if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+      if (isDeadlineQueueType(callQueueType)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
-        callExecutor = new RWQueueRpcExecutor("RW.deadline.Q", handlerCount, numCallQueues,
+        callExecutor = new RWQueueRpcExecutor("DeadlineRWQ.default", handlerCount, numCallQueues,
             callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
             BoundedPriorityBlockingQueue.class, callPriority);
-      } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+      } else if (isCodelQueueType(callQueueType)) {
         Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
           codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
-        callExecutor = new RWQueueRpcExecutor("RW.codel.Q", handlerCount,
+        callExecutor = new RWQueueRpcExecutor("CodelRWQ.default", handlerCount,
           numCallQueues, callqReadShare, callqScanShare,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
       } else {
-        callExecutor = new RWQueueRpcExecutor("RW.fifo.Q", handlerCount, numCallQueues,
+        // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor
+        callExecutor = new RWQueueRpcExecutor("FifoRWQ.default", handlerCount, numCallQueues,
           callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
       }
     } else {
       // multiple queues
-      if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
+      if (isDeadlineQueueType(callQueueType)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
         callExecutor =
-          new BalancedQueueRpcExecutor("B.deadline.Q", handlerCount, numCallQueues,
+          new BalancedQueueRpcExecutor("DeadlineBQ.default", handlerCount, numCallQueues,
             conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
-      } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+      } else if (isCodelQueueType(callQueueType)) {
         callExecutor =
-          new BalancedQueueRpcExecutor("B.codel.Q", handlerCount, numCallQueues,
+          new BalancedQueueRpcExecutor("CodelBQ.default", handlerCount, numCallQueues,
             conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
             codelTargetDelay, codelInterval, codelLifoThreshold,
             numGeneralCallsDropped, numLifoModeSwitches);
       } else {
-        callExecutor = new BalancedQueueRpcExecutor("B.fifo.Q", handlerCount,
-            numCallQueues, maxQueueLength, conf, abortable);
+        // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor
+        callExecutor = new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
+            handlerCount, numCallQueues, maxQueueLength, conf, abortable);
       }
     }
 
     // Create 2 queues to help priorityExecutor be more scalable.
-    this.priorityExecutor = priorityHandlerCount > 0 ?
-      new BalancedQueueRpcExecutor("B.priority.fifo.Q", priorityHandlerCount, 2,
-          maxPriorityQueueLength):
-      null;
-   this.replicationExecutor =
-     replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("B.replication.fifo.Q",
-       replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
+    this.priorityExecutor = priorityHandlerCount > 0?
+      new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
+         2, maxPriorityQueueLength, conf, abortable): null;
+    this.replicationExecutor = replicationHandlerCount > 0?
+      new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
+        replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
+  }
+
+  private static boolean isDeadlineQueueType(final String callQueueType) {
+    return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
+  }
+
+  private static boolean isCodelQueueType(final String callQueueType) {
+    return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
   }
 
   public SimpleRpcScheduler(

http://git-wip-us.apache.org/repos/asf/hbase/blob/8c46caac/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 561af58..1fda747 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -41,7 +41,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -55,7 +54,6 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -238,7 +236,7 @@ public class TestSimpleRpcScheduler {
       // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
       if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
         assertEquals(530, totalTime);
-      } else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ {
+      } else if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
         assertEquals(930, totalTime);
       }
     } finally {