You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2022/01/15 09:13:07 UTC

[hbase] branch branch-2 updated: Backport HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor to branch-2 (#4027)

This is an automated email from the ASF dual-hosted git repository.

reidchan pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 933fcdb  Backport HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor to branch-2 (#4027)
933fcdb is described below

commit 933fcdbe7c8357ac4d9679900dc38b403039f8e5
Author: Yutong Xiao <yu...@gmail.com>
AuthorDate: Sat Jan 15 17:12:07 2022 +0800

    Backport HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor to branch-2 (#4027)
    
    Signed-off-by: Reid Chan <re...@apache.org>
---
 .../ipc/FastPathBalancedQueueRpcExecutor.java      |  65 ++--------
 .../hbase/ipc/FastPathRWQueueRpcExecutor.java      |  71 +++++++++++
 .../hadoop/hbase/ipc/FastPathRpcHandler.java       |  76 ++++++++++++
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java       |  19 ++-
 .../org/apache/hadoop/hbase/ipc/RpcExecutor.java   | 107 ++---------------
 .../org/apache/hadoop/hbase/ipc/RpcHandler.java    | 131 +++++++++++++++++++++
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java       |   2 +-
 .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java   |   2 +-
 8 files changed, 315 insertions(+), 158 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
index 1db5408..e64ba4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
 import java.util.Deque;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -41,7 +40,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
   /*
    * Stack of Handlers waiting for work.
    */
-  private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
+  private final Deque<FastPathRpcHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
 
   public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
       final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
@@ -56,10 +55,12 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
   }
 
   @Override
-  protected Handler getHandler(String name, double handlerFailureThreshhold,
-      BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
-    return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
-        fastPathHandlerStack);
+  protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
+      final int handlerCount, final BlockingQueue<CallRunner> q,
+      final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
+      final Abortable abortable) {
+    return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
+      activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack);
   }
 
   @Override
@@ -69,62 +70,14 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
     if (currentQueueLimit == 0){
       return false;
     }
-    FastPathHandler handler = popReadyHandler();
+    FastPathRpcHandler handler = popReadyHandler();
     return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
   }
 
   /**
    * @return Pop a Handler instance if one available ready-to-go or else return null.
    */
-  private FastPathHandler popReadyHandler() {
+  private FastPathRpcHandler popReadyHandler() {
     return this.fastPathHandlerStack.poll();
   }
-
-  class FastPathHandler extends Handler {
-    // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
-    // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
-    final Deque<FastPathHandler> fastPathHandlerStack;
-    // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
-    // UNFAIR synchronization.
-    private Semaphore semaphore = new Semaphore(0);
-    // The task we get when fast-pathing.
-    private CallRunner loadedCallRunner;
-
-    FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
-        final AtomicInteger activeHandlerCount,
-        final Deque<FastPathHandler> fastPathHandlerStack) {
-      super(name, handlerFailureThreshhold, q, activeHandlerCount);
-      this.fastPathHandlerStack = fastPathHandlerStack;
-    }
-
-    @Override
-    protected CallRunner getCallRunner() throws InterruptedException {
-      // Get a callrunner if one in the Q.
-      CallRunner cr = this.q.poll();
-      if (cr == null) {
-        // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
-        // the fastpath handoff done via fastPathHandlerStack.
-        if (this.fastPathHandlerStack != null) {
-          this.fastPathHandlerStack.push(this);
-          this.semaphore.acquire();
-          cr = this.loadedCallRunner;
-          this.loadedCallRunner = null;
-        } else {
-          // No fastpath available. Block until a task comes available.
-          cr = super.getCallRunner();
-        }
-      }
-      return cr;
-    }
-
-    /**
-     * @param cr Task gotten via fastpath.
-     * @return True if we successfully loaded our task
-     */
-    boolean loadCallRunner(final CallRunner cr) {
-      this.loadedCallRunner = cr;
-      this.semaphore.release();
-      return true;
-    }
-  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java
new file mode 100644
index 0000000..cadce82
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Deque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
+ * {@link FastPathBalancedQueueRpcExecutor}.
+ */
+@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
+  private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
+
+  private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
+  private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
+  private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();
+
+  public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
+    PriorityFunction priority, Configuration conf, Abortable abortable) {
+    super(name, handlerCount, maxQueueLength, priority, conf, abortable);
+  }
+
+  @Override
+  protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
+      final int handlerCount, final BlockingQueue<CallRunner> q,
+      final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
+      final Abortable abortable) {
+    Deque<FastPathRpcHandler> handlerStack = name.contains("read") ? readHandlerStack :
+      name.contains("write") ? writeHandlerStack : scanHandlerStack;
+    return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
+      activeHandlerCount, failedHandlerCount, abortable, handlerStack);
+  }
+
+  @Override
+  public boolean dispatch(final CallRunner callTask) throws InterruptedException {
+    RpcCall call = callTask.getRpcCall();
+    boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
+    boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
+    FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll() :
+      shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll();
+    return handler != null ? handler.loadCallRunner(callTask) :
+      dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java
new file mode 100644
index 0000000..3064c7a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Deque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class FastPathRpcHandler extends RpcHandler {
+  // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
+  // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
+  final Deque<FastPathRpcHandler> fastPathHandlerStack;
+  // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
+  // UNFAIR synchronization.
+  private Semaphore semaphore = new Semaphore(0);
+  // The task we get when fast-pathing.
+  private CallRunner loadedCallRunner;
+
+  FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount,
+      BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount,
+      AtomicInteger failedHandlerCount, final Abortable abortable,
+      final Deque<FastPathRpcHandler> fastPathHandlerStack) {
+    super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount,
+      abortable);
+    this.fastPathHandlerStack = fastPathHandlerStack;
+  }
+
+  @Override
+  protected CallRunner getCallRunner() throws InterruptedException {
+    // Get a callrunner if one in the Q.
+    CallRunner cr = this.q.poll();
+    if (cr == null) {
+      // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
+      // the fastpath handoff done via fastPathHandlerStack.
+      if (this.fastPathHandlerStack != null) {
+        this.fastPathHandlerStack.push(this);
+        this.semaphore.acquire();
+        cr = this.loadedCallRunner;
+        this.loadedCallRunner = null;
+      } else {
+        // No fastpath available. Block until a task comes available.
+        cr = super.getCallRunner();
+      }
+    }
+    return cr;
+  }
+
+  /**
+   * @param cr Task gotten via fastpath.
+   * @return True if we successfully loaded our task
+   */
+  boolean loadCallRunner(final CallRunner cr) {
+    this.loadedCallRunner = cr;
+    this.semaphore.release();
+    return true;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 5e7e2f8..fc450a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -19,7 +19,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import java.util.concurrent.BlockingQueue;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -130,16 +130,22 @@ public class RWQueueRpcExecutor extends RpcExecutor {
   @Override
   public boolean dispatch(final CallRunner callTask) throws InterruptedException {
     RpcCall call = callTask.getRpcCall();
+    return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
+      shouldDispatchToScanQueue(callTask), callTask);
+  }
+
+  protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
+      final CallRunner callTask) {
     int queueIndex;
-    if (isWriteRequest(call.getHeader(), call.getParam())) {
+    if (toWriteQueue) {
       queueIndex = writeBalancer.getNextQueue();
-    } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
+    } else if (toScanQueue) {
       queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
     } else {
       queueIndex = numWriteQueues + readBalancer.getNextQueue();
     }
 
-    BlockingQueue<CallRunner> queue = queues.get(queueIndex);
+    Queue<CallRunner> queue = queues.get(queueIndex);
     if (queue.size() >= currentQueueLimit) {
       return false;
     }
@@ -232,6 +238,11 @@ public class RWQueueRpcExecutor extends RpcExecutor {
     return param instanceof ScanRequest;
   }
 
+  protected boolean shouldDispatchToScanQueue(final CallRunner task) {
+    RpcCall call = task.getRpcCall();
+    return numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam());
+  }
+
   protected float getReadShare(final Configuration conf) {
     return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 1167a2a4..a215d9e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -39,10 +39,8 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.base.Strings;
@@ -98,12 +96,11 @@ public abstract class RpcExecutor {
   protected volatile int currentQueueLimit;
 
   private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
-  private final List<Handler> handlers;
+  private final List<RpcHandler> handlers;
   private final int handlerCount;
   private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
 
   private String name;
-  private boolean running;
 
   private Configuration conf = null;
   private Abortable abortable = null;
@@ -239,13 +236,12 @@ public abstract class RpcExecutor {
   }
 
   public void start(final int port) {
-    running = true;
     startHandlers(port);
   }
 
   public void stop() {
-    running = false;
-    for (Thread handler : handlers) {
+    for (RpcHandler handler : handlers) {
+      handler.stopRunning();
       handler.interrupt();
     }
   }
@@ -266,9 +262,12 @@ public abstract class RpcExecutor {
   /**
    * Override if providing alternate Handler implementation.
    */
-  protected Handler getHandler(final String name, final double handlerFailureThreshhold,
-      final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
-    return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount);
+  protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
+      final int handlerCount, final BlockingQueue<CallRunner> q,
+      final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
+      final Abortable abortable) {
+    return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount,
+      failedHandlerCount, abortable);
   }
 
   /**
@@ -285,8 +284,8 @@ public abstract class RpcExecutor {
       final int index = qindex + (i % qsize);
       String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
           + ",port=" + port;
-      Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
-        activeHandlerCount);
+      RpcHandler handler = getHandler(name, handlerFailureThreshhold, handlerCount,
+        callQueues.get(index), activeHandlerCount, failedHandlerCount, abortable);
       handler.start();
       handlers.add(handler);
     }
@@ -294,90 +293,6 @@ public abstract class RpcExecutor {
         handlers.size(), threadPrefix, qsize, port);
   }
 
-  /**
-   * Handler thread run the {@link CallRunner#run()} in.
-   */
-  protected class Handler extends Thread {
-    /**
-     * Q to find CallRunners to run in.
-     */
-    final BlockingQueue<CallRunner> q;
-
-    final double handlerFailureThreshhold;
-
-    // metrics (shared with other handlers)
-    final AtomicInteger activeHandlerCount;
-
-    Handler(final String name, final double handlerFailureThreshhold,
-        final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
-      super(name);
-      setDaemon(true);
-      this.q = q;
-      this.handlerFailureThreshhold = handlerFailureThreshhold;
-      this.activeHandlerCount = activeHandlerCount;
-    }
-
-    /**
-     * @return A {@link CallRunner}
-     * @throws InterruptedException
-     */
-    protected CallRunner getCallRunner() throws InterruptedException {
-      return this.q.take();
-    }
-
-    @Override
-    public void run() {
-      boolean interrupted = false;
-      try {
-        while (running) {
-          try {
-            run(getCallRunner());
-          } catch (InterruptedException e) {
-            interrupted = true;
-          }
-        }
-      } catch (Exception e) {
-        LOG.warn(e.toString(), e);
-        throw e;
-      } finally {
-        if (interrupted) {
-          Thread.currentThread().interrupt();
-        }
-      }
-    }
-
-    private void run(CallRunner cr) {
-      MonitoredRPCHandler status = RpcServer.getStatus();
-      cr.setStatus(status);
-      try {
-        this.activeHandlerCount.incrementAndGet();
-        cr.run();
-      } catch (Throwable e) {
-        if (e instanceof Error) {
-          int failedCount = failedHandlerCount.incrementAndGet();
-          if (this.handlerFailureThreshhold >= 0
-              && failedCount > handlerCount * this.handlerFailureThreshhold) {
-            String message = "Number of failed RpcServer handler runs exceeded threshhold "
-                + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
-            if (abortable != null) {
-              abortable.abort(message, e);
-            } else {
-              LOG.error("Error but can't abort because abortable is null: "
-                  + StringUtils.stringifyException(e));
-              throw e;
-            }
-          } else {
-            LOG.warn("Handler errors " + StringUtils.stringifyException(e));
-          }
-        } else {
-          LOG.warn("Handler  exception " + StringUtils.stringifyException(e));
-        }
-      } finally {
-        this.activeHandlerCount.decrementAndGet();
-      }
-    }
-  }
-
   public static abstract class QueueBalancer {
     /**
      * @return the index of the next queue to which a request should be inserted
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java
new file mode 100644
index 0000000..b99a2c9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread to handle rpc call.
+ * Should only be used in {@link RpcExecutor} and its sub-classes.
+ */
+@InterfaceAudience.Private
+public class RpcHandler extends Thread {
+  private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class);
+
+  /**
+   * Q to find CallRunners to run in.
+   */
+  final BlockingQueue<CallRunner> q;
+
+  final int handlerCount;
+  final double handlerFailureThreshhold;
+
+  // metrics (shared with other handlers)
+  final AtomicInteger activeHandlerCount;
+  final AtomicInteger failedHandlerCount;
+
+  // The up-level RpcServer.
+  final Abortable abortable;
+
+  private boolean running;
+
+  RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount,
+      final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount,
+      final AtomicInteger failedHandlerCount, final Abortable abortable) {
+    super(name);
+    setDaemon(true);
+    this.q = q;
+    this.handlerFailureThreshhold = handlerFailureThreshhold;
+    this.activeHandlerCount = activeHandlerCount;
+    this.failedHandlerCount = failedHandlerCount;
+    this.handlerCount = handlerCount;
+    this.abortable = abortable;
+  }
+
+  /**
+   * @return A {@link CallRunner}
+   * @throws InterruptedException thrown by {@link BlockingQueue#take()}
+   */
+  protected CallRunner getCallRunner() throws InterruptedException {
+    return this.q.take();
+  }
+
+  public void stopRunning() {
+    running = false;
+  }
+
+  @Override
+  public void run() {
+    boolean interrupted = false;
+    running = true;
+    try {
+      while (running) {
+        try {
+          run(getCallRunner());
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn(e.toString(), e);
+      throw e;
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private void run(CallRunner cr) {
+    MonitoredRPCHandler status = RpcServer.getStatus();
+    cr.setStatus(status);
+    try {
+      this.activeHandlerCount.incrementAndGet();
+      cr.run();
+    } catch (Throwable e) {
+      if (e instanceof Error) {
+        int failedCount = failedHandlerCount.incrementAndGet();
+        if (this.handlerFailureThreshhold >= 0
+          && failedCount > handlerCount * this.handlerFailureThreshhold) {
+          String message = "Number of failed RpcServer handler runs exceeded threshhold "
+            + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
+          if (abortable != null) {
+            abortable.abort(message, e);
+          } else {
+            LOG.error("Error but can't abort because abortable is null: "
+              + StringUtils.stringifyException(e));
+            throw e;
+          }
+        } else {
+          LOG.warn("Handler errors " + StringUtils.stringifyException(e));
+        }
+      } else {
+        LOG.warn("Handler  exception " + StringUtils.stringifyException(e));
+      }
+    } finally {
+      this.activeHandlerCount.decrementAndGet();
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 9c1cb8b..1b8887a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -85,7 +85,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
 
     if (callqReadShare > 0) {
       // at least 1 read handler and 1 write handler
-      callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
+      callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
         maxQueueLength, priority, conf, server);
     } else {
       if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 7d32f35..286094c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -662,7 +662,7 @@ public class TestSimpleRpcScheduler {
     assertFalse(executor.dispatch(task));
     //make sure we never internally get a handler, which would skip the queue validation
     Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
-      Mockito.any(), Mockito.any());
+      Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
   }
 
   @Test