You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2022/01/18 03:13:53 UTC
[hbase] branch branch-2.5 updated: HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#4036)
This is an automated email from the ASF dual-hosted git repository.
reidchan pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 540fbe5 HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#4036)
540fbe5 is described below
commit 540fbe51b05482a74a4243c8a8d5fdf919a11689
Author: Yutong Xiao <yu...@gmail.com>
AuthorDate: Tue Jan 18 11:13:13 2022 +0800
HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#4036)
Signed-off-by: Reid Chan <re...@apache.org>
---
.../ipc/FastPathBalancedQueueRpcExecutor.java | 65 ++--------
.../hbase/ipc/FastPathRWQueueRpcExecutor.java | 70 +++++++++++
.../hadoop/hbase/ipc/FastPathRpcHandler.java | 76 ++++++++++++
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 19 ++-
.../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 107 ++---------------
.../org/apache/hadoop/hbase/ipc/RpcHandler.java | 131 +++++++++++++++++++++
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 2 +-
.../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 2 +-
8 files changed, 314 insertions(+), 158 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
index 1db5408..2fd6456 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -41,7 +40,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
/*
* Stack of Handlers waiting for work.
*/
- private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
+ private final Deque<FastPathRpcHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
@@ -56,10 +55,12 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
}
@Override
- protected Handler getHandler(String name, double handlerFailureThreshhold,
- BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
- return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
- fastPathHandlerStack);
+ protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
+ final int handlerCount, final BlockingQueue<CallRunner> q,
+ final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
+ final Abortable abortable) {
+ return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
+ activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack);
}
@Override
@@ -69,62 +70,14 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
if (currentQueueLimit == 0){
return false;
}
- FastPathHandler handler = popReadyHandler();
+ FastPathRpcHandler handler = popReadyHandler();
return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
}
/**
* @return Pop a Handler instance if one available ready-to-go or else return null.
*/
- private FastPathHandler popReadyHandler() {
+ private FastPathRpcHandler popReadyHandler() {
return this.fastPathHandlerStack.poll();
}
-
- class FastPathHandler extends Handler {
- // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
- // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
- final Deque<FastPathHandler> fastPathHandlerStack;
- // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
- // UNFAIR synchronization.
- private Semaphore semaphore = new Semaphore(0);
- // The task we get when fast-pathing.
- private CallRunner loadedCallRunner;
-
- FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
- final AtomicInteger activeHandlerCount,
- final Deque<FastPathHandler> fastPathHandlerStack) {
- super(name, handlerFailureThreshhold, q, activeHandlerCount);
- this.fastPathHandlerStack = fastPathHandlerStack;
- }
-
- @Override
- protected CallRunner getCallRunner() throws InterruptedException {
- // Get a callrunner if one in the Q.
- CallRunner cr = this.q.poll();
- if (cr == null) {
- // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
- // the fastpath handoff done via fastPathHandlerStack.
- if (this.fastPathHandlerStack != null) {
- this.fastPathHandlerStack.push(this);
- this.semaphore.acquire();
- cr = this.loadedCallRunner;
- this.loadedCallRunner = null;
- } else {
- // No fastpath available. Block until a task comes available.
- cr = super.getCallRunner();
- }
- }
- return cr;
- }
-
- /**
- * @param cr Task gotten via fastpath.
- * @return True if we successfully loaded our task
- */
- boolean loadCallRunner(final CallRunner cr) {
- this.loadedCallRunner = cr;
- this.semaphore.release();
- return true;
- }
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java
new file mode 100644
index 0000000..5d09e66
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Deque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
+ * {@link FastPathBalancedQueueRpcExecutor}.
+ */
+@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
+
+ private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
+ private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
+ private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();
+
+ public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
+ PriorityFunction priority, Configuration conf, Abortable abortable) {
+ super(name, handlerCount, maxQueueLength, priority, conf, abortable);
+ }
+
+ @Override
+ protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
+ final int handlerCount, final BlockingQueue<CallRunner> q,
+ final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
+ final Abortable abortable) {
+ Deque<FastPathRpcHandler> handlerStack = name.contains("read") ? readHandlerStack :
+ name.contains("write") ? writeHandlerStack : scanHandlerStack;
+ return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
+ activeHandlerCount, failedHandlerCount, abortable, handlerStack);
+ }
+
+ @Override public boolean dispatch(final CallRunner callTask) throws InterruptedException {
+ RpcCall call = callTask.getRpcCall();
+ boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
+ boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
+ FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll() :
+ shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll();
+ return handler != null ? handler.loadCallRunner(callTask) :
+ dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java
new file mode 100644
index 0000000..6d20aeb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Deque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class FastPathRpcHandler extends RpcHandler {
+ // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
+ // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
+ final Deque<FastPathRpcHandler> fastPathHandlerStack;
+ // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
+ // UNFAIR synchronization.
+ private Semaphore semaphore = new Semaphore(0);
+ // The task we get when fast-pathing.
+ private CallRunner loadedCallRunner;
+
+ FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount,
+ BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount,
+ AtomicInteger failedHandlerCount, final Abortable abortable,
+ final Deque<FastPathRpcHandler> fastPathHandlerStack) {
+ super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount,
+ abortable);
+ this.fastPathHandlerStack = fastPathHandlerStack;
+ }
+
+ @Override
+ protected CallRunner getCallRunner() throws InterruptedException {
+ // Get a callrunner if one in the Q.
+ CallRunner cr = this.q.poll();
+ if (cr == null) {
+ // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
+ // the fastpath handoff done via fastPathHandlerStack.
+ if (this.fastPathHandlerStack != null) {
+ this.fastPathHandlerStack.push(this);
+ this.semaphore.acquire();
+ cr = this.loadedCallRunner;
+ this.loadedCallRunner = null;
+ } else {
+ // No fastpath available. Block until a task comes available.
+ cr = super.getCallRunner();
+ }
+ }
+ return cr;
+ }
+
+ /**
+ * @param cr Task gotten via fastpath.
+ * @return True if we successfully loaded our task
+ */
+ boolean loadCallRunner(final CallRunner cr) {
+ this.loadedCallRunner = cr;
+ this.semaphore.release();
+ return true;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 5e7e2f8..fc450a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hbase.ipc;
-import java.util.concurrent.BlockingQueue;
+import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -130,16 +130,22 @@ public class RWQueueRpcExecutor extends RpcExecutor {
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcCall call = callTask.getRpcCall();
+ return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
+ shouldDispatchToScanQueue(callTask), callTask);
+ }
+
+ protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
+ final CallRunner callTask) {
int queueIndex;
- if (isWriteRequest(call.getHeader(), call.getParam())) {
+ if (toWriteQueue) {
queueIndex = writeBalancer.getNextQueue();
- } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
+ } else if (toScanQueue) {
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();
}
- BlockingQueue<CallRunner> queue = queues.get(queueIndex);
+ Queue<CallRunner> queue = queues.get(queueIndex);
if (queue.size() >= currentQueueLimit) {
return false;
}
@@ -232,6 +238,11 @@ public class RWQueueRpcExecutor extends RpcExecutor {
return param instanceof ScanRequest;
}
+ protected boolean shouldDispatchToScanQueue(final CallRunner task) {
+ RpcCall call = task.getRpcCall();
+ return numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam());
+ }
+
protected float getReadShare(final Configuration conf) {
return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 1167a2a4..1d5a970 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -39,10 +39,8 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
@@ -98,12 +96,11 @@ public abstract class RpcExecutor {
protected volatile int currentQueueLimit;
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
- private final List<Handler> handlers;
+ private final List<RpcHandler> handlers;
private final int handlerCount;
private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
private String name;
- private boolean running;
private Configuration conf = null;
private Abortable abortable = null;
@@ -239,13 +236,12 @@ public abstract class RpcExecutor {
}
public void start(final int port) {
- running = true;
startHandlers(port);
}
public void stop() {
- running = false;
- for (Thread handler : handlers) {
+ for (RpcHandler handler : handlers) {
+ handler.stopRunning();
handler.interrupt();
}
}
@@ -266,9 +262,12 @@ public abstract class RpcExecutor {
/**
* Override if providing alternate Handler implementation.
*/
- protected Handler getHandler(final String name, final double handlerFailureThreshhold,
- final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
- return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount);
+ protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
+ final int handlerCount, final BlockingQueue<CallRunner> q,
+ final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
+ final Abortable abortable) {
+ return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount,
+ failedHandlerCount, abortable);
}
/**
@@ -285,8 +284,8 @@ public abstract class RpcExecutor {
final int index = qindex + (i % qsize);
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
+ ",port=" + port;
- Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
- activeHandlerCount);
+ RpcHandler handler = getHandler(name, handlerFailureThreshhold, handlerCount,
+ callQueues.get(index), activeHandlerCount, failedHandlerCount, abortable);
handler.start();
handlers.add(handler);
}
@@ -294,90 +293,6 @@ public abstract class RpcExecutor {
handlers.size(), threadPrefix, qsize, port);
}
- /**
- * Handler thread run the {@link CallRunner#run()} in.
- */
- protected class Handler extends Thread {
- /**
- * Q to find CallRunners to run in.
- */
- final BlockingQueue<CallRunner> q;
-
- final double handlerFailureThreshhold;
-
- // metrics (shared with other handlers)
- final AtomicInteger activeHandlerCount;
-
- Handler(final String name, final double handlerFailureThreshhold,
- final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
- super(name);
- setDaemon(true);
- this.q = q;
- this.handlerFailureThreshhold = handlerFailureThreshhold;
- this.activeHandlerCount = activeHandlerCount;
- }
-
- /**
- * @return A {@link CallRunner}
- * @throws InterruptedException
- */
- protected CallRunner getCallRunner() throws InterruptedException {
- return this.q.take();
- }
-
- @Override
- public void run() {
- boolean interrupted = false;
- try {
- while (running) {
- try {
- run(getCallRunner());
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- } catch (Exception e) {
- LOG.warn(e.toString(), e);
- throw e;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private void run(CallRunner cr) {
- MonitoredRPCHandler status = RpcServer.getStatus();
- cr.setStatus(status);
- try {
- this.activeHandlerCount.incrementAndGet();
- cr.run();
- } catch (Throwable e) {
- if (e instanceof Error) {
- int failedCount = failedHandlerCount.incrementAndGet();
- if (this.handlerFailureThreshhold >= 0
- && failedCount > handlerCount * this.handlerFailureThreshhold) {
- String message = "Number of failed RpcServer handler runs exceeded threshhold "
- + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
- if (abortable != null) {
- abortable.abort(message, e);
- } else {
- LOG.error("Error but can't abort because abortable is null: "
- + StringUtils.stringifyException(e));
- throw e;
- }
- } else {
- LOG.warn("Handler errors " + StringUtils.stringifyException(e));
- }
- } else {
- LOG.warn("Handler exception " + StringUtils.stringifyException(e));
- }
- } finally {
- this.activeHandlerCount.decrementAndGet();
- }
- }
- }
-
public static abstract class QueueBalancer {
/**
* @return the index of the next queue to which a request should be inserted
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java
new file mode 100644
index 0000000..ce103e7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread to handle rpc call.
+ * Should only be used in {@link RpcExecutor} and its sub-classes.
+ */
+@InterfaceAudience.Private
+public class RpcHandler extends Thread {
+ private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class);
+
+ /**
+ * Q to find CallRunners to run in.
+ */
+ final BlockingQueue<CallRunner> q;
+
+ final int handlerCount;
+ final double handlerFailureThreshhold;
+
+ // metrics (shared with other handlers)
+ final AtomicInteger activeHandlerCount;
+ final AtomicInteger failedHandlerCount;
+
+ // The up-level RpcServer.
+ final Abortable abortable;
+
+ private boolean running;
+
+ RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount,
+ final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount,
+ final AtomicInteger failedHandlerCount, final Abortable abortable) {
+ super(name);
+ setDaemon(true);
+ this.q = q;
+ this.handlerFailureThreshhold = handlerFailureThreshhold;
+ this.activeHandlerCount = activeHandlerCount;
+ this.failedHandlerCount = failedHandlerCount;
+ this.handlerCount = handlerCount;
+ this.abortable = abortable;
+ }
+
+ /**
+ * @return A {@link CallRunner}
+ * @throws InterruptedException
+ */
+ protected CallRunner getCallRunner() throws InterruptedException {
+ return this.q.take();
+ }
+
+ public void stopRunning() {
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ boolean interrupted = false;
+ running = true;
+ try {
+ while (running) {
+ try {
+ run(getCallRunner());
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn(e.toString(), e);
+ throw e;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void run(CallRunner cr) {
+ MonitoredRPCHandler status = RpcServer.getStatus();
+ cr.setStatus(status);
+ try {
+ this.activeHandlerCount.incrementAndGet();
+ cr.run();
+ } catch (Throwable e) {
+ if (e instanceof Error) {
+ int failedCount = failedHandlerCount.incrementAndGet();
+ if (this.handlerFailureThreshhold >= 0
+ && failedCount > handlerCount * this.handlerFailureThreshhold) {
+ String message = "Number of failed RpcServer handler runs exceeded threshhold "
+ + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
+ if (abortable != null) {
+ abortable.abort(message, e);
+ } else {
+ LOG.error("Error but can't abort because abortable is null: "
+ + StringUtils.stringifyException(e));
+ throw e;
+ }
+ } else {
+ LOG.warn("Handler errors " + StringUtils.stringifyException(e));
+ }
+ } else {
+ LOG.warn("Handler exception " + StringUtils.stringifyException(e));
+ }
+ } finally {
+ this.activeHandlerCount.decrementAndGet();
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 9c1cb8b..1b8887a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -85,7 +85,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
if (callqReadShare > 0) {
// at least 1 read handler and 1 write handler
- callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
+ callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
maxQueueLength, priority, conf, server);
} else {
if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 7d32f35..286094c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -662,7 +662,7 @@ public class TestSimpleRpcScheduler {
assertFalse(executor.dispatch(task));
//make sure we never internally get a handler, which would skip the queue validation
Mockito.verify(executor, Mockito.never()).getHandler(Mockito.any(), Mockito.anyDouble(),
- Mockito.any(), Mockito.any());
+ Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
}
@Test