You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/11/29 01:44:25 UTC
hbase git commit: HBASE-16561 Add metrics about read/write/scan queue
length and active read/write/scan handler count
Repository: hbase
Updated Branches:
refs/heads/master 3d5e68607 -> cc03f7ad5
HBASE-16561 Add metrics about read/write/scan queue length and active read/write/scan handler count
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cc03f7ad
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cc03f7ad
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cc03f7ad
Branch: refs/heads/master
Commit: cc03f7ad5320d9b91cd65e0630501d08d341ad74
Parents: 3d5e686
Author: Guanghao Zhang <zg...@gmail.com>
Authored: Fri Nov 25 14:13:21 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Nov 29 09:43:30 2016 +0800
----------------------------------------------------------------------
.../hbase/ipc/MetricsHBaseServerSource.java | 15 ++++
.../hbase/ipc/MetricsHBaseServerWrapper.java | 19 ++++++
.../hbase/ipc/MetricsHBaseServerSourceImpl.java | 14 +++-
.../ipc/FastPathBalancedQueueRpcExecutor.java | 9 ++-
.../hadoop/hbase/ipc/FifoRpcScheduler.java | 30 ++++++++
.../ipc/MetricsHBaseServerWrapperImpl.java | 48 +++++++++++++
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 67 +++++++++++++++---
.../apache/hadoop/hbase/ipc/RpcExecutor.java | 72 ++++++++++++++------
.../apache/hadoop/hbase/ipc/RpcScheduler.java | 18 +++++
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 36 ++++++++--
.../hbase/ipc/DelegatingRpcScheduler.java | 30 ++++++++
.../ipc/MetricsHBaseServerWrapperStub.java | 30 ++++++++
.../apache/hadoop/hbase/ipc/TestRpcMetrics.java | 7 +-
13 files changed, 354 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index cf9c6c7..cc0a60b 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -62,10 +62,25 @@ public interface MetricsHBaseServerSource extends BaseSource {
String REPLICATION_QUEUE_DESC =
"Number of calls in the replication call queue waiting to be run";
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
+ String WRITE_QUEUE_NAME = "numCallsInWriteQueue";
+ String WRITE_QUEUE_DESC = "Number of calls in the write call queue; " +
+ "parsed requests waiting in scheduler to be executed";
+ String READ_QUEUE_NAME = "numCallsInReadQueue";
+ String READ_QUEUE_DESC = "Number of calls in the read call queue; " +
+ "parsed requests waiting in scheduler to be executed";
+ String SCAN_QUEUE_NAME = "numCallsInScanQueue";
+ String SCAN_QUEUE_DESC = "Number of calls in the scan call queue; " +
+ "parsed requests waiting in scheduler to be executed";
String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
String NUM_ACTIVE_HANDLER_DESC = "Number of active rpc handlers.";
+ String NUM_ACTIVE_WRITE_HANDLER_NAME = "numActiveWriteHandler";
+ String NUM_ACTIVE_WRITE_HANDLER_DESC = "Number of active write rpc handlers.";
+ String NUM_ACTIVE_READ_HANDLER_NAME = "numActiveReadHandler";
+ String NUM_ACTIVE_READ_HANDLER_DESC = "Number of active read rpc handlers.";
+ String NUM_ACTIVE_SCAN_HANDLER_NAME = "numActiveScanHandler";
+ String NUM_ACTIVE_SCAN_HANDLER_DESC = "Number of active scan rpc handlers.";
String NUM_GENERAL_CALLS_DROPPED_NAME = "numGeneralCallsDropped";
String NUM_GENERAL_CALLS_DROPPED_DESC = "Total number of calls in general queue which " +
"were dropped by CoDel RPC executor";
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
index 8f30205..b272cd0 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
@@ -21,11 +21,30 @@ package org.apache.hadoop.hbase.ipc;
public interface MetricsHBaseServerWrapper {
long getTotalQueueSize();
+
int getGeneralQueueLength();
+
int getReplicationQueueLength();
+
int getPriorityQueueLength();
+
int getNumOpenConnections();
+
int getActiveRpcHandlerCount();
+
long getNumGeneralCallsDropped();
+
long getNumLifoModeSwitches();
+
+ int getWriteQueueLength();
+
+ int getReadQueueLength();
+
+ int getScanQueueLength();
+
+ int getActiveWriteRpcHandlerCount();
+
+ int getActiveReadRpcHandlerCount();
+
+ int getActiveScanRpcHandlerCount();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index d372b1b..69aa5fe 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -239,7 +239,19 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.addCounter(Interns.info(NUM_GENERAL_CALLS_DROPPED_NAME,
NUM_GENERAL_CALLS_DROPPED_DESC), wrapper.getNumGeneralCallsDropped())
.addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME,
- NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches());
+ NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches())
+ .addGauge(Interns.info(WRITE_QUEUE_NAME, WRITE_QUEUE_DESC),
+ wrapper.getWriteQueueLength())
+ .addGauge(Interns.info(READ_QUEUE_NAME, READ_QUEUE_DESC),
+ wrapper.getReadQueueLength())
+ .addGauge(Interns.info(SCAN_QUEUE_NAME, SCAN_QUEUE_DESC),
+ wrapper.getScanQueueLength())
+ .addGauge(Interns.info(NUM_ACTIVE_WRITE_HANDLER_NAME, NUM_ACTIVE_WRITE_HANDLER_DESC),
+ wrapper.getActiveWriteRpcHandlerCount())
+ .addGauge(Interns.info(NUM_ACTIVE_READ_HANDLER_NAME, NUM_ACTIVE_READ_HANDLER_DESC),
+ wrapper.getActiveReadRpcHandlerCount())
+ .addGauge(Interns.info(NUM_ACTIVE_SCAN_HANDLER_NAME, NUM_ACTIVE_SCAN_HANDLER_DESC),
+ wrapper.getActiveScanRpcHandlerCount());
}
metricsRegistry.snapshot(mrb, all);
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
index 24ae40a..ab16627 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
@@ -21,6 +21,7 @@ import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
@@ -56,8 +57,9 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
@Override
protected Handler getHandler(String name, double handlerFailureThreshhold,
- BlockingQueue<CallRunner> q) {
- return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
+ BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
+ return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
+ fastPathHandlerStack);
}
@Override
@@ -83,8 +85,9 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
private CallRunner loadedCallRunner;
FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
+ final AtomicInteger activeHandlerCount,
final Deque<FastPathHandler> fastPathHandlerStack) {
- super(name, handlerFailureThreshhold, q);
+ super(name, handlerFailureThreshhold, q, activeHandlerCount);
this.fastPathHandlerStack = fastPathHandlerStack;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index 70d903a..a9b6fd1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -118,4 +118,34 @@ public class FifoRpcScheduler extends RpcScheduler {
public long getNumLifoModeSwitches() {
return 0;
}
+
+ @Override
+ public int getWriteQueueLength() {
+ return 0;
+ }
+
+ @Override
+ public int getReadQueueLength() {
+ return 0;
+ }
+
+ @Override
+ public int getScanQueueLength() {
+ return 0;
+ }
+
+ @Override
+ public int getActiveWriteRpcHandlerCount() {
+ return 0;
+ }
+
+ @Override
+ public int getActiveReadRpcHandlerCount() {
+ return 0;
+ }
+
+ @Override
+ public int getActiveScanRpcHandlerCount() {
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 0edf40f..4afcc33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -94,4 +94,52 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
}
return server.getScheduler().getNumLifoModeSwitches();
}
+
+ @Override
+ public int getWriteQueueLength() {
+ if (!isServerStarted() || this.server.getScheduler() == null) {
+ return 0;
+ }
+ return server.getScheduler().getWriteQueueLength();
+ }
+
+ @Override
+ public int getReadQueueLength() {
+ if (!isServerStarted() || this.server.getScheduler() == null) {
+ return 0;
+ }
+ return server.getScheduler().getReadQueueLength();
+ }
+
+ @Override
+ public int getScanQueueLength() {
+ if (!isServerStarted() || this.server.getScheduler() == null) {
+ return 0;
+ }
+ return server.getScheduler().getScanQueueLength();
+ }
+
+ @Override
+ public int getActiveWriteRpcHandlerCount() {
+ if (!isServerStarted() || this.server.getScheduler() == null) {
+ return 0;
+ }
+ return server.getScheduler().getActiveWriteRpcHandlerCount();
+ }
+
+ @Override
+ public int getActiveReadRpcHandlerCount() {
+ if (!isServerStarted() || this.server.getScheduler() == null) {
+ return 0;
+ }
+ return server.getScheduler().getActiveReadRpcHandlerCount();
+ }
+
+ @Override
+ public int getActiveScanRpcHandlerCount() {
+ if (!isServerStarted() || this.server.getScheduler() == null) {
+ return 0;
+ }
+ return server.getScheduler().getActiveScanRpcHandlerCount();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 1387872..6005900 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -19,12 +19,9 @@
package org.apache.hadoop.hbase.ipc;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -39,7 +36,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
/**
@@ -67,6 +63,10 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final int numReadQueues;
private final int numScanQueues;
+ private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
+ private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
+ private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);
+
public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
@@ -117,11 +117,13 @@ public class RWQueueRpcExecutor extends RpcExecutor {
@Override
protected void startHandlers(final int port) {
- startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
- startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
+ startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port,
+ activeWriteHandlerCount);
+ startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port,
+ activeReadHandlerCount);
if (numScanQueues > 0) {
startHandlers(".scan", scanHandlersCount, queues, numWriteQueues + numReadQueues,
- numScanQueues, port);
+ numScanQueues, port, activeScanHandlerCount);
}
}
@@ -144,6 +146,55 @@ public class RWQueueRpcExecutor extends RpcExecutor {
return queue.offer(callTask);
}
+ @Override
+ public int getWriteQueueLength() {
+ int length = 0;
+ for (int i = 0; i < numWriteQueues; i++) {
+ length += queues.get(i).size();
+ }
+ return length;
+ }
+
+ @Override
+ public int getReadQueueLength() {
+ int length = 0;
+ for (int i = numWriteQueues; i < (numWriteQueues + numReadQueues); i++) {
+ length += queues.get(i).size();
+ }
+ return length;
+ }
+
+ @Override
+ public int getScanQueueLength() {
+ int length = 0;
+ for (int i = numWriteQueues + numReadQueues;
+ i < (numWriteQueues + numReadQueues + numScanQueues); i++) {
+ length += queues.get(i).size();
+ }
+ return length;
+ }
+
+ @Override
+ public int getActiveHandlerCount() {
+ return activeWriteHandlerCount.get() + activeReadHandlerCount.get()
+ + activeScanHandlerCount.get();
+ }
+
+ @Override
+ public int getActiveWriteHandlerCount() {
+ return activeWriteHandlerCount.get();
+ }
+
+ @Override
+ public int getActiveReadHandlerCount() {
+ return activeReadHandlerCount.get();
+ }
+
+ @Override
+ public int getActiveScanHandlerCount() {
+ return activeScanHandlerCount.get();
+ }
+
private boolean isWriteRequest(final RequestHeader header, final Message param) {
// TODO: Is there a better way to do this?
if (param instanceof MultiRequest) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index cab53dc..e41f4c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.ipc;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
@@ -175,19 +174,6 @@ public abstract class RpcExecutor {
}
}
- public int getActiveHandlerCount() {
- return activeHandlerCount.get();
- }
-
- /** Returns the length of the pending queue */
- public int getQueueLength() {
- int length = 0;
- for (final BlockingQueue<CallRunner> queue: queues) {
- length += queue.size();
- }
- return length;
- }
-
/** Add the request to the executor queue */
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
@@ -198,15 +184,15 @@ public abstract class RpcExecutor {
protected void startHandlers(final int port) {
List<BlockingQueue<CallRunner>> callQueues = getQueues();
- startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
+ startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port, activeHandlerCount);
}
/**
* Override if providing alternate Handler implementation.
*/
protected Handler getHandler(final String name, final double handlerFailureThreshhold,
- final BlockingQueue<CallRunner> q) {
- return new Handler(name, handlerFailureThreshhold, q);
+ final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
+ return new Handler(name, handlerFailureThreshhold, q, activeHandlerCount);
}
/**
@@ -214,7 +200,7 @@ public abstract class RpcExecutor {
*/
protected void startHandlers(final String nameSuffix, final int numHandlers,
final List<BlockingQueue<CallRunner>> callQueues, final int qindex, final int qsize,
- final int port) {
+ final int port, final AtomicInteger activeHandlerCount) {
final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
double handlerFailureThreshhold = conf == null ? 1.0 : conf.getDouble(
HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
@@ -223,7 +209,8 @@ public abstract class RpcExecutor {
final int index = qindex + (i % qsize);
String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue=" + index
+ ",port=" + port;
- Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index));
+ Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index),
+ activeHandlerCount);
handler.start();
LOG.debug("Started " + name);
handlers.add(handler);
@@ -241,12 +228,16 @@ public abstract class RpcExecutor {
final double handlerFailureThreshhold;
+ // metrics (shared with other handlers)
+ final AtomicInteger activeHandlerCount;
+
Handler(final String name, final double handlerFailureThreshhold,
- final BlockingQueue<CallRunner> q) {
+ final BlockingQueue<CallRunner> q, final AtomicInteger activeHandlerCount) {
super(name);
setDaemon(true);
this.q = q;
this.handlerFailureThreshhold = handlerFailureThreshhold;
+ this.activeHandlerCount = activeHandlerCount;
}
/**
@@ -282,7 +273,7 @@ public abstract class RpcExecutor {
MonitoredRPCHandler status = RpcServer.getStatus();
cr.setStatus(status);
try {
- activeHandlerCount.incrementAndGet();
+ this.activeHandlerCount.incrementAndGet();
cr.run();
} catch (Throwable e) {
if (e instanceof Error) {
@@ -305,7 +296,7 @@ public abstract class RpcExecutor {
LOG.warn("Handler exception " + StringUtils.stringifyException(e));
}
} finally {
- activeHandlerCount.decrementAndGet();
+ this.activeHandlerCount.decrementAndGet();
}
}
}
@@ -400,6 +391,43 @@ public abstract class RpcExecutor {
return numLifoModeSwitches.get();
}
+ public int getActiveHandlerCount() {
+ return activeHandlerCount.get();
+ }
+
+ public int getActiveWriteHandlerCount() {
+ return 0;
+ }
+
+ public int getActiveReadHandlerCount() {
+ return 0;
+ }
+
+ public int getActiveScanHandlerCount() {
+ return 0;
+ }
+
+ /** Returns the length of the pending queue */
+ public int getQueueLength() {
+ int length = 0;
+ for (final BlockingQueue<CallRunner> queue: queues) {
+ length += queue.size();
+ }
+ return length;
+ }
+
+ public int getReadQueueLength() {
+ return 0;
+ }
+
+ public int getScanQueueLength() {
+ return 0;
+ }
+
+ public int getWriteQueueLength() {
+ return 0;
+ }
+
public String getName() {
return this.name;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
index 91c152b..fff8373 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
@@ -89,4 +89,22 @@ public abstract class RpcScheduler {
* in the period of overloade we serve last requests first); returns 0 otherwise.
*/
public abstract long getNumLifoModeSwitches();
+
+ /** Retrieves length of the write queue for metrics when use RWQueueRpcExecutor. */
+ public abstract int getWriteQueueLength();
+
+ /** Retrieves length of the read queue for metrics when use RWQueueRpcExecutor. */
+ public abstract int getReadQueueLength();
+
+ /** Retrieves length of the scan queue for metrics when use RWQueueRpcExecutor. */
+ public abstract int getScanQueueLength();
+
+ /** Retrieves the number of active write rpc handler when use RWQueueRpcExecutor. */
+ public abstract int getActiveWriteRpcHandlerCount();
+
+ /** Retrieves the number of active write rpc handler when use RWQueueRpcExecutor. */
+ public abstract int getActiveReadRpcHandlerCount();
+
+ /** Retrieves the number of active write rpc handler when use RWQueueRpcExecutor. */
+ public abstract int getActiveScanRpcHandlerCount();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 9037f07..3aa486e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -17,11 +17,6 @@
*/
package org.apache.hadoop.hbase.ipc;
-
-import java.util.Comparator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -31,7 +26,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
/**
* The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
@@ -205,5 +199,35 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public long getNumLifoModeSwitches() {
return callExecutor.getNumLifoModeSwitches();
}
+
+ @Override
+ public int getWriteQueueLength() {
+ return callExecutor.getWriteQueueLength();
+ }
+
+ @Override
+ public int getReadQueueLength() {
+ return callExecutor.getReadQueueLength();
+ }
+
+ @Override
+ public int getScanQueueLength() {
+ return callExecutor.getScanQueueLength();
+ }
+
+ @Override
+ public int getActiveWriteRpcHandlerCount() {
+ return callExecutor.getActiveWriteHandlerCount();
+ }
+
+ @Override
+ public int getActiveReadRpcHandlerCount() {
+ return callExecutor.getActiveReadHandlerCount();
+ }
+
+ @Override
+ public int getActiveScanRpcHandlerCount() {
+ return callExecutor.getActiveScanHandlerCount();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
index c395c41..3c8f114 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
@@ -73,4 +73,34 @@ public class DelegatingRpcScheduler extends RpcScheduler {
public long getNumLifoModeSwitches() {
return delegate.getNumLifoModeSwitches();
}
+
+ @Override
+ public int getWriteQueueLength() {
+ return 0;
+ }
+
+ @Override
+ public int getReadQueueLength() {
+ return 0;
+ }
+
+ @Override
+ public int getScanQueueLength() {
+ return 0;
+ }
+
+ @Override
+ public int getActiveWriteRpcHandlerCount() {
+ return 0;
+ }
+
+ @Override
+ public int getActiveReadRpcHandlerCount() {
+ return 0;
+ }
+
+ @Override
+ public int getActiveScanRpcHandlerCount() {
+ return 0;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
index b001d74..3b787a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
@@ -58,4 +58,34 @@ public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper{
public long getNumLifoModeSwitches() {
return 5;
}
+
+ @Override
+ public int getWriteQueueLength() {
+ return 50;
+ }
+
+ @Override
+ public int getReadQueueLength() {
+ return 50;
+ }
+
+ @Override
+ public int getScanQueueLength() {
+ return 2;
+ }
+
+ @Override
+ public int getActiveWriteRpcHandlerCount() {
+ return 50;
+ }
+
+ @Override
+ public int getActiveReadRpcHandlerCount() {
+ return 50;
+ }
+
+ @Override
+ public int getActiveScanRpcHandlerCount() {
+ return 6;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cc03f7ad/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
index 4de618f..a0769da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
@@ -46,7 +46,6 @@ public class TestRpcMetrics {
MetricsHBaseServer rsMetrics = new MetricsHBaseServer("HRegionServer", new MetricsHBaseServerWrapperStub());
MetricsHBaseServerSource rsSource = rsMetrics.getMetricsSource();
-
assertEquals("master", masterSource.getMetricsContext());
assertEquals("regionserver", rsSource.getMetricsContext());
@@ -71,6 +70,12 @@ public class TestRpcMetrics {
HELPER.assertGauge("numCallsInPriorityQueue", 104, serverSource);
HELPER.assertGauge("numOpenConnections", 105, serverSource);
HELPER.assertGauge("numActiveHandler", 106, serverSource);
+ HELPER.assertGauge("numActiveWriteHandler", 50, serverSource);
+ HELPER.assertGauge("numActiveReadHandler", 50, serverSource);
+ HELPER.assertGauge("numActiveScanHandler", 6, serverSource);
+ HELPER.assertGauge("numCallsInWriteQueue", 50, serverSource);
+ HELPER.assertGauge("numCallsInReadQueue", 50, serverSource);
+ HELPER.assertGauge("numCallsInScanQueue", 2, serverSource);
}
/**