You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/31 01:37:29 UTC
svn commit: r1238114 - in
/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc:
HBaseRpcMetrics.java HBaseServer.java RpcServer.java
Author: tedyu
Date: Tue Jan 31 00:37:29 2012
New Revision: 1238114
URL: http://svn.apache.org/viewvc?rev=1238114&view=rev
Log:
HBASE-5297 Update metrics numOpenConnections and callQueueLen directly in HBaseServer (Scott Chen)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=1238114&r1=1238113&r2=1238114&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Tue Jan 31 00:37:29 2012
@@ -47,13 +47,10 @@ public class HBaseRpcMetrics implements
public static final String NAME_DELIM = "$";
private final MetricsRegistry registry = new MetricsRegistry();
private final MetricsRecord metricsRecord;
- private final RpcServer myServer;
private static Log LOG = LogFactory.getLog(HBaseRpcMetrics.class);
private final HBaseRPCStatistics rpcStatistics;
- public HBaseRpcMetrics(String hostName, String port,
- final RpcServer server) {
- myServer = server;
+ public HBaseRpcMetrics(String hostName, String port) {
MetricsContext context = MetricsUtil.getContext("rpc");
metricsRecord = MetricsUtil.createRecord(context, "metrics");
@@ -89,6 +86,8 @@ public class HBaseRpcMetrics implements
new MetricsIntValue("NumOpenConnections", registry);
public final MetricsIntValue callQueueLen =
new MetricsIntValue("callQueueLen", registry);
+ public final MetricsIntValue priorityCallQueueLen =
+ new MetricsIntValue("priorityCallQueueLen", registry);
public final MetricsTimeVaryingInt authenticationFailures =
new MetricsTimeVaryingInt("rpcAuthenticationFailures", registry);
public final MetricsTimeVaryingInt authenticationSuccesses =
@@ -203,14 +202,9 @@ public class HBaseRpcMetrics implements
* Push the metrics to the monitoring subsystem on doUpdate() call.
*/
public void doUpdates(final MetricsContext context) {
- synchronized (this) {
- // ToFix - fix server to use the following two metrics directly so
- // the metrics do not have be copied here.
- numOpenConnections.set(myServer.getNumOpenConnections());
- callQueueLen.set(myServer.getCallQueueLen());
- for (MetricsBase m : registry.getMetricsList()) {
- m.pushMetric(metricsRecord);
- }
+ // Both getMetricsList() and pushMetric() are thread-safe
+ for (MetricsBase m : registry.getMetricsList()) {
+ m.pushMetric(metricsRecord);
}
metricsRecord.update();
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1238114&r1=1238113&r2=1238114&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Jan 31 00:37:29 2012
@@ -689,6 +689,7 @@ public abstract class HBaseServer implem
reader.finishAdd();
}
}
+ rpcMetrics.numOpenConnections.set(numConnections);
}
void doRead(SelectionKey key) throws InterruptedException {
@@ -1252,8 +1253,10 @@ public abstract class HBaseServer implem
if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
priorityCallQueue.put(call);
+ updateCallQueueLenMetrics(priorityCallQueue);
} else {
callQueue.put(call); // queue the call; maybe blocked here
+ updateCallQueueLenMetrics(callQueue);
}
}
@@ -1270,6 +1273,20 @@ public abstract class HBaseServer implem
}
}
+ /**
+ * Reports length of the call queue to HBaseRpcMetrics.
+ * @param queue Which queue to report
+ */
+ private void updateCallQueueLenMetrics(BlockingQueue<Call> queue) {
+ if (queue == callQueue) {
+ rpcMetrics.callQueueLen.set(callQueue.size());
+ } else if (queue == priorityCallQueue) {
+ rpcMetrics.priorityCallQueueLen.set(priorityCallQueue.size());
+ } else {
+ LOG.warn("Unknown call queue");
+ }
+ }
+
/** Handles queued calls . */
private class Handler extends Thread {
private final BlockingQueue<Call> myCallQueue;
@@ -1297,6 +1314,7 @@ public abstract class HBaseServer implem
try {
status.pause("Waiting for a call");
Call call = myCallQueue.take(); // pop the queue; maybe blocked here
+ updateCallQueueLenMetrics(myCallQueue);
status.setStatus("Setting up call");
status.setConnection(call.connection.getHostAddress(),
call.connection.getRemotePort());
@@ -1433,8 +1451,8 @@ public abstract class HBaseServer implem
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
- this.rpcMetrics = new HBaseRpcMetrics(serverName,
- Integer.toString(this.port), this);
+ this.rpcMetrics = new HBaseRpcMetrics(
+ serverName, Integer.toString(this.port));
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
@@ -1492,10 +1510,12 @@ public abstract class HBaseServer implem
protected void closeConnection(Connection connection) {
synchronized (connectionList) {
- if (connectionList.remove(connection))
+ if (connectionList.remove(connection)) {
numConnections--;
+ }
}
connection.close();
+ rpcMetrics.numOpenConnections.set(numConnections);
}
/** Sets the socket buffer size used for responding to RPCs.
@@ -1593,24 +1613,6 @@ public abstract class HBaseServer implem
}
/**
- * The number of open RPC conections
- * @return the number of open rpc connections
- */
- @Override
- public int getNumOpenConnections() {
- return numConnections;
- }
-
- /**
- * The number of rpc calls in the queue.
- * @return The number of rpc calls in the queue.
- */
- @Override
- public int getCallQueueLen() {
- return callQueue.size();
- }
-
- /**
* Set the handler for calling out of RPC for error conditions.
* @param handler the handler implementation
*/
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1238114&r1=1238113&r2=1238114&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Tue Jan 31 00:37:29 2012
@@ -52,10 +52,6 @@ public interface RpcServer {
Writable param, long receiveTime, MonitoredRPCHandler status)
throws IOException;
- int getNumOpenConnections();
-
- int getCallQueueLen();
-
void setErrorHandler(HBaseRPCErrorHandler handler);
void setQosFunction(Function<Writable, Integer> newFunc);