You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2022/01/17 02:44:41 UTC
[hbase] branch branch-1 updated: HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#4028)
This is an automated email from the ASF dual-hosted git repository.
reidchan pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 598b453 HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#4028)
598b453 is described below
commit 598b453a4100120827e91b9c6a82f3da1ba4e2bf
Author: Yutong Xiao <yu...@gmail.com>
AuthorDate: Mon Jan 17 10:44:03 2022 +0800
HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#4028)
Signed-off-by: Reid Chan <re...@apache.org>
---
.../ipc/FastPathBalancedQueueRpcExecutor.java | 63 ++--------
.../hbase/ipc/FastPathRWQueueRpcExecutor.java | 71 +++++++++++
.../hadoop/hbase/ipc/FastPathRpcHandler.java | 76 ++++++++++++
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 28 ++++-
.../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 109 ++---------------
.../org/apache/hadoop/hbase/ipc/RpcHandler.java | 131 +++++++++++++++++++++
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 2 +-
.../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 4 +-
8 files changed, 326 insertions(+), 158 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
index 724d828..11674b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -41,7 +40,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
/*
* Stack of Handlers waiting for work.
*/
- private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
+ private final Deque<FastPathRpcHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
@@ -57,10 +56,12 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
}
@Override
- protected Handler getHandler(String name, double handlerFailureThreshhold,
- BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
- return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
- fastPathHandlerStack);
+ protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
+ final int handlerCount, final BlockingQueue<CallRunner> q,
+ final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
+ final Abortable abortable) {
+ return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
+ activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack);
}
@Override
@@ -70,60 +71,14 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
if (currentQueueLimit == 0){
return false;
}
- FastPathHandler handler = popReadyHandler();
+ FastPathRpcHandler handler = popReadyHandler();
return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
}
/**
* @return Pop a Handler instance if one available ready-to-go or else return null.
*/
- private FastPathHandler popReadyHandler() {
+ private FastPathRpcHandler popReadyHandler() {
return this.fastPathHandlerStack.poll();
}
-
- class FastPathHandler extends Handler {
- // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
- // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
- final Deque<FastPathHandler> fastPathHandlerStack;
- // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
- private Semaphore semaphore = new Semaphore(0);
- // The task we get when fast-pathing.
- private CallRunner loadedCallRunner;
-
- FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
- final AtomicInteger activeHandlerCount,
- final Deque<FastPathHandler> fastPathHandlerStack) {
- super(name, handlerFailureThreshhold, q, activeHandlerCount);
- this.fastPathHandlerStack = fastPathHandlerStack;
- }
-
- protected CallRunner getCallRunner() throws InterruptedException {
- // Get a callrunner if one in the Q.
- CallRunner cr = this.q.poll();
- if (cr == null) {
- // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
- // the fastpath handoff done via fastPathHandlerStack.
- if (this.fastPathHandlerStack != null) {
- this.fastPathHandlerStack.push(this);
- this.semaphore.acquire();
- cr = this.loadedCallRunner;
- this.loadedCallRunner = null;
- } else {
- // No fastpath available. Block until a task comes available.
- cr = super.getCallRunner();
- }
- }
- return cr;
- }
-
- /**
- * @param task Task gotten via fastpath.
- * @return True if we successfully loaded our task
- */
- boolean loadCallRunner(final CallRunner cr) {
- this.loadedCallRunner = cr;
- this.semaphore.release();
- return true;
- }
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java
new file mode 100644
index 0000000..54751de
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Deque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
+ * {@link FastPathBalancedQueueRpcExecutor}.
+ */
+@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
+
+ private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
+ private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
+ private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();
+
+ public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
+ PriorityFunction priority, Configuration conf, Abortable abortable) {
+ super(name, handlerCount, maxQueueLength, priority, conf, abortable);
+ }
+
+ @Override
+ protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
+ final int handlerCount, final BlockingQueue<CallRunner> q,
+ final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
+ final Abortable abortable) {
+ Deque<FastPathRpcHandler> handlerStack = name.contains("read") ? readHandlerStack :
+ name.contains("write") ? writeHandlerStack : scanHandlerStack;
+ return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
+ activeHandlerCount, failedHandlerCount, abortable, handlerStack);
+ }
+
+ @Override
+ public boolean dispatch(final CallRunner callTask) throws InterruptedException {
+ RpcServer.Call call = callTask.getCall();
+ boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.param);
+ boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
+ FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll() :
+ shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll();
+ return handler != null ? handler.loadCallRunner(callTask) :
+ dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java
new file mode 100644
index 0000000..13edd08
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRpcHandler.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Deque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class FastPathRpcHandler extends RpcHandler {
+ // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
+ // if an empty queue of CallRunners so we are available for direct handoff when one comes in.
+ final Deque<FastPathRpcHandler> fastPathHandlerStack;
+ // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
+ // UNFAIR synchronization.
+ private Semaphore semaphore = new Semaphore(0);
+ // The task we get when fast-pathing.
+ private CallRunner loadedCallRunner;
+
+ FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount,
+ BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount,
+ AtomicInteger failedHandlerCount, final Abortable abortable,
+ final Deque<FastPathRpcHandler> fastPathHandlerStack) {
+ super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount,
+ abortable);
+ this.fastPathHandlerStack = fastPathHandlerStack;
+ }
+
+ @Override
+ protected CallRunner getCallRunner() throws InterruptedException {
+ // Get a callrunner if one in the Q.
+ CallRunner cr = this.q.poll();
+ if (cr == null) {
+ // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
+ // the fastpath handoff done via fastPathHandlerStack.
+ if (this.fastPathHandlerStack != null) {
+ this.fastPathHandlerStack.push(this);
+ this.semaphore.acquire();
+ cr = this.loadedCallRunner;
+ this.loadedCallRunner = null;
+ } else {
+ // No fastpath available. Block until a task comes available.
+ cr = super.getCallRunner();
+ }
+ }
+ return cr;
+ }
+
+ /**
+ * @param cr Task gotten via fastpath.
+ * @return True if we successfully loaded our task
+ */
+ boolean loadCallRunner(final CallRunner cr) {
+ this.loadedCallRunner = cr;
+ this.semaphore.release();
+ return true;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 9feef25..234c7a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.ipc;
-import java.util.List;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -251,16 +251,22 @@ public class RWQueueRpcExecutor extends RpcExecutor {
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall();
+ return dispatchTo(isWriteRequest(call.getHeader(), call.param),
+ shouldDispatchToScanQueue(callTask), callTask);
+ }
+
+ protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
+ final CallRunner callTask) {
int queueIndex;
- if (isWriteRequest(call.getHeader(), call.param)) {
+ if (toWriteQueue) {
queueIndex = writeBalancer.getNextQueue();
- } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
+ } else if (toScanQueue) {
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();
}
- BlockingQueue<CallRunner> queue = queues.get(queueIndex);
+ Queue<CallRunner> queue = queues.get(queueIndex);
if (queue.size() >= currentQueueLimit) {
return false;
}
@@ -316,7 +322,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
return activeScanHandlerCount.get();
}
- private boolean isWriteRequest(final RequestHeader header, final Message param) {
+ protected boolean isWriteRequest(final RequestHeader header, final Message param) {
// TODO: Is there a better way to do this?
if (param instanceof MultiRequest) {
MultiRequest multi = (MultiRequest)param;
@@ -353,6 +359,18 @@ public class RWQueueRpcExecutor extends RpcExecutor {
return param instanceof ScanRequest;
}
+ protected boolean shouldDispatchToScanQueue(final CallRunner task) {
+ RpcServer.Call call = task.getCall();
+ return numScanQueues > 0 && isScanRequest(call.getHeader(), call.param);
+ }
+
+ protected float getReadShare(final Configuration conf) {
+ return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
+ }
+
+ protected float getScanShare(final Configuration conf) {
+ return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
+ }
/*
* Calculate the number of writers based on the "total count" and the read share.
* You'll get at least one writer.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index d46786b..d6c4829 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -34,10 +34,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -89,12 +87,11 @@ public abstract class RpcExecutor {
protected volatile int currentQueueLimit;
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
- private final List<Handler> handlers;
+ private final List<RpcHandler> handlers;
private final int handlerCount;
private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
private String name;
- private boolean running;
private Configuration conf = null;
private Abortable abortable = null;
@@ -102,7 +99,7 @@ public abstract class RpcExecutor {
@Deprecated
public RpcExecutor(final String name, final int handlerCount, final int numCallQueues) {
this.name = Strings.nullToEmpty(name);
- this.handlers = new ArrayList<Handler>(handlerCount);
+ this.handlers = new ArrayList<RpcHandler>(handlerCount);
this.handlerCount = handlerCount;
this.numCallQueues = numCallQueues;
this.queues = new ArrayList<>(this.numCallQueues);
@@ -197,13 +194,12 @@ public abstract class RpcExecutor {
}
public void start(final int port) {
- running = true;
startHandlers(port);
}
public void stop() {
- running = false;
- for (Thread handler : handlers) {
+ for (RpcHandler handler : handlers) {
+ handler.stopRunning();
handler.interrupt();
}
}
@@ -224,9 +220,12 @@ public abstract class RpcExecutor {
/**
* Override if providing alternate Handler implementation.
*/
- protected Handler getHandler(final String name, final double handlerFailureThreshhold,
- final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
- return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount);
+ protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
+ final int handlerCount, final BlockingQueue<CallRunner> q,
+ final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
+ final Abortable abortable) {
+ return new RpcHandler(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount,
+ failedHandlerCount, abortable);
}
/**
@@ -243,98 +242,14 @@ public abstract class RpcExecutor {
final int index = qindex + (i % qsize);
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
+ ",port=" + port;
- Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
- activeHandlerCount);
+ RpcHandler handler = getHandler(name, handlerFailureThreshhold, handlerCount,
+ callQueues.get(index), activeHandlerCount, failedHandlerCount, abortable);
handler.start();
LOG.debug("Started " + name);
handlers.add(handler);
}
}
- /**
- * Handler thread run the {@link CallRunner#run()} in.
- */
- protected class Handler extends Thread {
- /**
- * Q to find CallRunners to run in.
- */
- final BlockingQueue<CallRunner> q;
-
- final double handlerFailureThreshhold;
-
- // metrics (shared with other handlers)
- final AtomicInteger activeHandlerCount;
-
- Handler(final String name, final double handlerFailureThreshhold,
- final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
- super(name);
- setDaemon(true);
- this.q = q;
- this.handlerFailureThreshhold = handlerFailureThreshhold;
- this.activeHandlerCount = activeHandlerCount;
- }
-
- /**
- * @return A {@link CallRunner}
- * @throws InterruptedException
- */
- protected CallRunner getCallRunner() throws InterruptedException {
- return this.q.take();
- }
-
- @Override
- public void run() {
- boolean interrupted = false;
- try {
- while (running) {
- try {
- run(getCallRunner());
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- } catch (Exception e) {
- LOG.warn(e);
- throw e;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private void run(CallRunner cr) {
- MonitoredRPCHandler status = RpcServer.getStatus();
- cr.setStatus(status);
- try {
- this.activeHandlerCount.incrementAndGet();
- cr.run();
- } catch (Throwable e) {
- if (e instanceof Error) {
- int failedCount = failedHandlerCount.incrementAndGet();
- if (this.handlerFailureThreshhold >= 0
- && failedCount > handlerCount * this.handlerFailureThreshhold) {
- String message = "Number of failed RpcServer handler runs exceeded threshhold "
- + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
- if (abortable != null) {
- abortable.abort(message, e);
- } else {
- LOG.error("Error but can't abort because abortable is null: "
- + StringUtils.stringifyException(e));
- throw e;
- }
- } else {
- LOG.warn("Handler errors " + StringUtils.stringifyException(e));
- }
- } else {
- LOG.warn("Handler exception " + StringUtils.stringifyException(e));
- }
- } finally {
- this.activeHandlerCount.decrementAndGet();
- }
- }
- }
-
public static abstract class QueueBalancer {
/**
* @return the index of the next queue to which a request should be inserted
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java
new file mode 100644
index 0000000..dbbaaa8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcHandler.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread to handle rpc call.
+ * Should only be used in {@link RpcExecutor} and its sub-classes.
+ */
+@InterfaceAudience.Private
+public class RpcHandler extends Thread {
+ private static final Logger LOG = LoggerFactory.getLogger(RpcHandler.class);
+
+ /**
+ * Q to find CallRunners to run in.
+ */
+ final BlockingQueue<CallRunner> q;
+
+ final int handlerCount;
+ final double handlerFailureThreshhold;
+
+ // metrics (shared with other handlers)
+ final AtomicInteger activeHandlerCount;
+ final AtomicInteger failedHandlerCount;
+
+ // The up-level RpcServer.
+ final Abortable abortable;
+
+ private boolean running;
+
+ RpcHandler(final String name, final double handlerFailureThreshhold, final int handlerCount,
+ final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount,
+ final AtomicInteger failedHandlerCount, final Abortable abortable) {
+ super(name);
+ setDaemon(true);
+ this.q = q;
+ this.handlerFailureThreshhold = handlerFailureThreshhold;
+ this.activeHandlerCount = activeHandlerCount;
+ this.failedHandlerCount = failedHandlerCount;
+ this.handlerCount = handlerCount;
+ this.abortable = abortable;
+ }
+
+ /**
+ * @return A {@link CallRunner}
+ * @throws InterruptedException Throws by {@link BlockingQueue#take()}
+ */
+ protected CallRunner getCallRunner() throws InterruptedException {
+ return this.q.take();
+ }
+
+ public void stopRunning() {
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ boolean interrupted = false;
+ running = true;
+ try {
+ while (running) {
+ try {
+ run(getCallRunner());
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn(e.toString(), e);
+ throw e;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void run(CallRunner cr) {
+ MonitoredRPCHandler status = RpcServer.getStatus();
+ cr.setStatus(status);
+ try {
+ this.activeHandlerCount.incrementAndGet();
+ cr.run();
+ } catch (Throwable e) {
+ if (e instanceof Error) {
+ int failedCount = failedHandlerCount.incrementAndGet();
+ if (this.handlerFailureThreshhold >= 0
+ && failedCount > handlerCount * this.handlerFailureThreshhold) {
+ String message = "Number of failed RpcServer handler runs exceeded threshhold "
+ + this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
+ if (abortable != null) {
+ abortable.abort(message, e);
+ } else {
+ LOG.error("Error but can't abort because abortable is null: "
+ + StringUtils.stringifyException(e));
+ throw e;
+ }
+ } else {
+ LOG.warn("Handler errors " + StringUtils.stringifyException(e));
+ }
+ } else {
+ LOG.warn("Handler exception " + StringUtils.stringifyException(e));
+ }
+ } finally {
+ this.activeHandlerCount.decrementAndGet();
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index dbe4667..049d33b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -82,7 +82,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
if (callqReadShare > 0) {
// at least 1 read handler and 1 write handler
- callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
+ callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
maxQueueLength, priority, conf, server);
} else {
if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index d18e167..602a722 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -498,8 +498,10 @@ public class TestSimpleRpcScheduler {
CallRunner task = mock(CallRunner.class);
assertFalse(executor.dispatch(task));
//make sure we never internally get a handler, which would skip the queue validation
+
Mockito.verify(executor, Mockito.never()).getHandler(Mockito.anyString(), Mockito.anyDouble(),
- (BlockingQueue<CallRunner>) Mockito.any(), (AtomicInteger) Mockito.any());
+ Mockito.anyInt(), (BlockingQueue<CallRunner>) Mockito.any(), (AtomicInteger) Mockito.any(),
+ (AtomicInteger) Mockito.any(), (Abortable) Mockito.any());
}
// Get mocked call that has the CallRunner sleep for a while so that the fast