You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/31 01:37:29 UTC

svn commit: r1238114 - in /hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc: HBaseRpcMetrics.java HBaseServer.java RpcServer.java

Author: tedyu
Date: Tue Jan 31 00:37:29 2012
New Revision: 1238114

URL: http://svn.apache.org/viewvc?rev=1238114&view=rev
Log:
HBASE-5297 Update metrics numOpenConnections and callQueueLen directly in HBaseServer (Scott Chen)

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=1238114&r1=1238113&r2=1238114&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Tue Jan 31 00:37:29 2012
@@ -47,13 +47,10 @@ public class HBaseRpcMetrics implements 
   public static final String NAME_DELIM = "$";
   private final MetricsRegistry registry = new MetricsRegistry();
   private final MetricsRecord metricsRecord;
-  private final RpcServer myServer;
   private static Log LOG = LogFactory.getLog(HBaseRpcMetrics.class);
   private final HBaseRPCStatistics rpcStatistics;
 
-  public HBaseRpcMetrics(String hostName, String port,
-      final RpcServer server) {
-    myServer = server;
+  public HBaseRpcMetrics(String hostName, String port) {
     MetricsContext context = MetricsUtil.getContext("rpc");
     metricsRecord = MetricsUtil.createRecord(context, "metrics");
 
@@ -89,6 +86,8 @@ public class HBaseRpcMetrics implements 
           new MetricsIntValue("NumOpenConnections", registry);
   public final MetricsIntValue callQueueLen =
           new MetricsIntValue("callQueueLen", registry);
+  public final MetricsIntValue priorityCallQueueLen =
+          new MetricsIntValue("priorityCallQueueLen", registry);
   public final MetricsTimeVaryingInt authenticationFailures = 
           new MetricsTimeVaryingInt("rpcAuthenticationFailures", registry);
   public final MetricsTimeVaryingInt authenticationSuccesses =
@@ -203,14 +202,9 @@ public class HBaseRpcMetrics implements 
    * Push the metrics to the monitoring subsystem on doUpdate() call.
    */
   public void doUpdates(final MetricsContext context) {
-    synchronized (this) {
-      // ToFix - fix server to use the following two metrics directly so
-      // the metrics do not have be copied here.
-      numOpenConnections.set(myServer.getNumOpenConnections());
-      callQueueLen.set(myServer.getCallQueueLen());
-      for (MetricsBase m : registry.getMetricsList()) {
-        m.pushMetric(metricsRecord);
-      }
+    // Both getMetricsList() and pushMetric() are thread-safe
+    for (MetricsBase m : registry.getMetricsList()) {
+      m.pushMetric(metricsRecord);
     }
     metricsRecord.update();
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1238114&r1=1238113&r2=1238114&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Jan 31 00:37:29 2012
@@ -689,6 +689,7 @@ public abstract class HBaseServer implem
           reader.finishAdd();
         }
       }
+      rpcMetrics.numOpenConnections.set(numConnections);
     }
 
     void doRead(SelectionKey key) throws InterruptedException {
@@ -1252,8 +1253,10 @@ public abstract class HBaseServer implem
 
       if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
         priorityCallQueue.put(call);
+        updateCallQueueLenMetrics(priorityCallQueue);
       } else {
         callQueue.put(call);              // queue the call; maybe blocked here
+        updateCallQueueLenMetrics(callQueue);
       }
     }
 
@@ -1270,6 +1273,20 @@ public abstract class HBaseServer implem
     }
   }
 
+  /**
+   * Reports length of the call queue to HBaseRpcMetrics.
+   * @param queue Which queue to report
+   */
+  private void updateCallQueueLenMetrics(BlockingQueue<Call> queue) {
+    if (queue == callQueue) {
+      rpcMetrics.callQueueLen.set(callQueue.size());
+    } else if (queue == priorityCallQueue) {
+      rpcMetrics.priorityCallQueueLen.set(priorityCallQueue.size());
+    } else {
+      LOG.warn("Unknown call queue");
+    }
+  }
+
   /** Handles queued calls . */
   private class Handler extends Thread {
     private final BlockingQueue<Call> myCallQueue;
@@ -1297,6 +1314,7 @@ public abstract class HBaseServer implem
         try {
           status.pause("Waiting for a call");
           Call call = myCallQueue.take(); // pop the queue; maybe blocked here
+          updateCallQueueLenMetrics(myCallQueue);
           status.setStatus("Setting up call");
           status.setConnection(call.connection.getHostAddress(), 
               call.connection.getRemotePort());
@@ -1433,8 +1451,8 @@ public abstract class HBaseServer implem
     // Start the listener here and let it bind to the port
     listener = new Listener();
     this.port = listener.getAddress().getPort();
-    this.rpcMetrics = new HBaseRpcMetrics(serverName,
-                          Integer.toString(this.port), this);
+    this.rpcMetrics = new HBaseRpcMetrics(
+        serverName, Integer.toString(this.port));
     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
     this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
 
@@ -1492,10 +1510,12 @@ public abstract class HBaseServer implem
 
   protected void closeConnection(Connection connection) {
     synchronized (connectionList) {
-      if (connectionList.remove(connection))
+      if (connectionList.remove(connection)) {
         numConnections--;
+      }
     }
     connection.close();
+    rpcMetrics.numOpenConnections.set(numConnections);
   }
 
   /** Sets the socket buffer size used for responding to RPCs.
@@ -1593,24 +1613,6 @@ public abstract class HBaseServer implem
   }
 
   /**
-   * The number of open RPC conections
-   * @return the number of open rpc connections
-   */
-  @Override
-  public int getNumOpenConnections() {
-    return numConnections;
-  }
-
-  /**
-   * The number of rpc calls in the queue.
-   * @return The number of rpc calls in the queue.
-   */
-  @Override
-  public int getCallQueueLen() {
-    return callQueue.size();
-  }
-
-  /**
    * Set the handler for calling out of RPC for error conditions.
    * @param handler the handler implementation
    */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1238114&r1=1238113&r2=1238114&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Tue Jan 31 00:37:29 2012
@@ -52,10 +52,6 @@ public interface RpcServer {
       Writable param, long receiveTime, MonitoredRPCHandler status)
       throws IOException;
 
-  int getNumOpenConnections();
-
-  int getCallQueueLen();
-
   void setErrorHandler(HBaseRPCErrorHandler handler);
 
   void setQosFunction(Function<Writable, Integer> newFunc);