You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:54:31 UTC

svn commit: r1077234 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/ipc/ core/org/apache/hadoop/ipc/metrics/ test/org/apache/hadoop/ipc/

Author: omalley
Date: Fri Mar  4 03:54:30 2011
New Revision: 1077234

URL: http://svn.apache.org/viewvc?rev=1077234&view=rev
Log:
commit f20e8090dda082e7c6fa03b008ee04da2435ad42
Author: Suresh Srinivas <su...@yahoo-inc.com>
Date:   Fri Feb 26 16:50:29 2010 -0800

    HDFS-6599 from https://issues.apache.org/jira/secure/attachment/12437251/hadoop-6599.rel20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    HADOOP-6599. Split existing RpcMetrics with summary in RpcMetrics and
    +    details information in RpcDetailedMetrics. (suresh)

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcMetrics.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java?rev=1077234&r1=1077233&r2=1077234&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java Fri Mar  4 03:54:30 2011
@@ -528,15 +528,15 @@ public class RPC {
         rpcMetrics.rpcProcessingTime.inc(processingTime);
 
         MetricsTimeVaryingRate m =
-         (MetricsTimeVaryingRate) rpcMetrics.registry.get(call.getMethodName());
+         (MetricsTimeVaryingRate) rpcDetailedMetrics.registry.get(call.getMethodName());
       	if (m == null) {
       	  try {
       	    m = new MetricsTimeVaryingRate(call.getMethodName(),
-      	                                        rpcMetrics.registry);
+      	                                        rpcDetailedMetrics.registry);
       	  } catch (IllegalArgumentException iae) {
       	    // the metrics has been registered; re-fetch the handle
       	    LOG.info("Error register " + call.getMethodName(), iae);
-      	    m = (MetricsTimeVaryingRate) rpcMetrics.registry.get(
+      	    m = (MetricsTimeVaryingRate) rpcDetailedMetrics.registry.get(
       	        call.getMethodName());
       	  }
       	}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java?rev=1077234&r1=1077233&r2=1077234&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java Fri Mar  4 03:54:30 2011
@@ -61,6 +61,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SaslRpcServer;
@@ -171,7 +172,8 @@ public abstract class Server {
                                                   // connections to nuke
                                                   //during a cleanup
   
-  protected RpcMetrics  rpcMetrics;
+  protected RpcMetrics rpcMetrics;
+  protected RpcDetailedMetrics rpcDetailedMetrics;
   
   private Configuration conf;
   private SecretManager<TokenIdentifier> secretManager;
@@ -1302,6 +1304,8 @@ public abstract class Server {
     this.port = listener.getAddress().getPort();    
     this.rpcMetrics = new RpcMetrics(serverName,
                           Integer.toString(this.port), this);
+    this.rpcDetailedMetrics = new RpcDetailedMetrics(serverName,
+                            Integer.toString(this.port));
     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
 
 
@@ -1411,6 +1415,9 @@ public abstract class Server {
     if (this.rpcMetrics != null) {
       this.rpcMetrics.shutdown();
     }
+    if (this.rpcDetailedMetrics != null) {
+      this.rpcDetailedMetrics.shutdown();
+    }
   }
 
   /** Wait for the server to be stopped.
@@ -1501,11 +1508,15 @@ public abstract class Server {
    *
    * @see WritableByteChannel#write(ByteBuffer)
    */
-  private static int channelWrite(WritableByteChannel channel, 
-                                  ByteBuffer buffer) throws IOException {
+  private int channelWrite(WritableByteChannel channel, 
+                           ByteBuffer buffer) throws IOException {
     
-    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
-           channel.write(buffer) : channelIO(null, channel, buffer);
+    int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+                 channel.write(buffer) : channelIO(null, channel, buffer);
+    if (count > 0) {
+      rpcMetrics.sentBytes.inc(count);
+    }
+    return count;
   }
   
   
@@ -1517,11 +1528,15 @@ public abstract class Server {
    * 
    * @see ReadableByteChannel#read(ByteBuffer)
    */
-  private static int channelRead(ReadableByteChannel channel, 
-                                 ByteBuffer buffer) throws IOException {
+  private int channelRead(ReadableByteChannel channel, 
+                          ByteBuffer buffer) throws IOException {
     
-    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
-           channel.read(buffer) : channelIO(channel, null, buffer);
+    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+                channel.read(buffer) : channelIO(channel, null, buffer);
+    if (count > 0) {
+      rpcMetrics.receivedBytes.inc(count);
+    }
+    return count;
   }
   
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java?rev=1077234&r1=1077233&r2=1077234&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java Fri Mar  4 03:54:30 2011
@@ -55,7 +55,7 @@ import org.apache.hadoop.metrics.util.Me
  */
 
 public class RpcActivityMBean extends MetricsDynamicMBeanBase {
-  final private ObjectName mbeanName;
+  private final ObjectName mbeanName;
 
   /**
    * 
@@ -63,9 +63,8 @@ public class RpcActivityMBean extends Me
    * @param serviceName - the service name for the rpc service 
    * @param port - the rpc port.
    */
-  public RpcActivityMBean(final MetricsRegistry mr, final String serviceName, final String port) {
-
-    
+  public RpcActivityMBean(final MetricsRegistry mr, final String serviceName,
+      final String port) {
     super(mr, "Rpc layer statistics");
     mbeanName = MBeanUtil.registerMBean(serviceName,
           "RpcActivityForPort" + port, this);
@@ -76,5 +75,4 @@ public class RpcActivityMBean extends Me
     if (mbeanName != null)
       MBeanUtil.unregisterMBean(mbeanName);
   }
-
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java?rev=1077234&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java Fri Mar  4 03:54:30 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+import javax.management.ObjectName;
+
+import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+/**
+ * 
+ * This is the JMX MBean for reporting the RPC layer Activity. The MBean is
+ * register using the name
+ * "hadoop:service=<RpcServiceName>,name=RpcDetailedActivityForPort<port>"
+ * 
+ * Many of the activity metrics are sampled and averaged on an interval which
+ * can be specified in the metrics config file.
+ * <p>
+ * For the metrics that are sampled and averaged, one must specify a metrics
+ * context that does periodic update calls. Most metrics contexts do. The
+ * default Null metrics context however does NOT. So if you aren't using any
+ * other metrics context then you can turn on the viewing and averaging of
+ * sampled metrics by specifying the following two lines in the
+ * hadoop-meterics.properties file:
+ * 
+ * <pre>
+ *        rpc.class=org.apache.hadoop.metrics.spi.NullContextWithUpdateThread
+ *        rpc.period=10
+ * </pre>
+ *<p>
+ * Note that the metrics are collected regardless of the context used. The
+ * context with the update thread is used to average the data periodically
+ * 
+ * Impl details: We use a dynamic mbean that gets the list of the metrics from
+ * the metrics registry passed as an argument to the constructor
+ */
+public class RpcDetailedActivityMBean extends MetricsDynamicMBeanBase {
+  private final ObjectName mbeanName;
+
+  /**
+   * @param mr - the metrics registry that has all the metrics
+   * @param serviceName - the service name for the rpc service
+   * @param port - the rpc port.
+   */
+  public RpcDetailedActivityMBean(final MetricsRegistry mr,
+      final String serviceName, final String port) {
+    super(mr, "Rpc layer detailed statistics");
+    mbeanName = MBeanUtil.registerMBean(serviceName,
+        "RpcDetailedActivityForPort" + port, this);
+  }
+
+  public void shutdown() {
+    if (mbeanName != null)
+      MBeanUtil.unregisterMBean(mbeanName);
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java?rev=1077234&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java Fri Mar  4 03:54:30 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.util.MetricsBase;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+
+/**
+ * 
+ * This class is for maintaining  the various RPC method related statistics
+ * and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ */
+public class RpcDetailedMetrics implements Updater {
+  public final MetricsRegistry registry = new MetricsRegistry();
+  private final MetricsRecord metricsRecord;
+  private static final Log LOG = LogFactory.getLog(RpcDetailedMetrics.class);
+  RpcDetailedActivityMBean rpcMBean;
+  
+  /**
+   * Statically added metrics to expose at least one metrics, without
+   * which other dynamically added metrics are not exposed over JMX.
+   */
+  final MetricsTimeVaryingRate getProtocolVersion = 
+    new MetricsTimeVaryingRate("getProtocolVersion", registry);
+  
+  public RpcDetailedMetrics(final String hostName, final String port) {
+    MetricsContext context = MetricsUtil.getContext("rpc");
+    metricsRecord = MetricsUtil.createRecord(context, "detailed-metrics");
+
+    metricsRecord.setTag("port", port);
+
+    LOG.info("Initializing RPC Metrics with hostName=" 
+        + hostName + ", port=" + port);
+
+    context.registerUpdater(this);
+    
+    // Need to clean up the interface to RpcMgt - don't need both metrics and server params
+    rpcMBean = new RpcDetailedActivityMBean(registry, hostName, port);
+  }
+  
+  
+  /**
+   * Push the metrics to the monitoring subsystem on doUpdate() call.
+   */
+  public void doUpdates(final MetricsContext context) {
+    
+    synchronized (this) {
+      for (MetricsBase m : registry.getMetricsList()) {
+        m.pushMetric(metricsRecord);
+      }
+    }
+    metricsRecord.update();
+  }
+
+  public void shutdown() {
+    if (rpcMBean != null) 
+      rpcMBean.shutdown();
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcMetrics.java?rev=1077234&r1=1077233&r2=1077234&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcMetrics.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcMetrics.java Fri Mar  4 03:54:30 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.metrics.util.Me
 import org.apache.hadoop.metrics.util.MetricsIntValue;
 import org.apache.hadoop.metrics.util.MetricsRegistry;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 
 /**
@@ -43,13 +44,14 @@ import org.apache.hadoop.metrics.util.Me
  *
  */
 public class RpcMetrics implements Updater {
-  public MetricsRegistry registry = new MetricsRegistry();
-  private MetricsRecord metricsRecord;
-  private Server myServer;
-  private static Log LOG = LogFactory.getLog(RpcMetrics.class);
+  private final MetricsRegistry registry = new MetricsRegistry();
+  private final MetricsRecord metricsRecord;
+  private final Server myServer;
+  private static final Log LOG = LogFactory.getLog(RpcMetrics.class);
   RpcActivityMBean rpcMBean;
   
-  public RpcMetrics(String hostName, String port, Server server) {
+  public RpcMetrics(final String hostName, final String port,
+      final Server server) {
     myServer = server;
     MetricsContext context = MetricsUtil.getContext("rpc");
     metricsRecord = MetricsUtil.createRecord(context, "metrics");
@@ -72,26 +74,30 @@ public class RpcMetrics implements Updat
    *  -they can also be read directly - e.g. JMX does this.
    */
 
-  public MetricsTimeVaryingRate rpcQueueTime =
+  public final MetricsTimeVaryingLong receivedBytes = 
+         new MetricsTimeVaryingLong("ReceivedBytes", registry);
+  public final MetricsTimeVaryingLong sentBytes = 
+         new MetricsTimeVaryingLong("SentBytes", registry);
+  public final MetricsTimeVaryingRate rpcQueueTime =
           new MetricsTimeVaryingRate("RpcQueueTime", registry);
   public MetricsTimeVaryingRate rpcProcessingTime =
           new MetricsTimeVaryingRate("RpcProcessingTime", registry);
-  public MetricsIntValue numOpenConnections = 
+  public final MetricsIntValue numOpenConnections = 
           new MetricsIntValue("NumOpenConnections", registry);
-  public MetricsIntValue callQueueLen = 
+  public final MetricsIntValue callQueueLen = 
           new MetricsIntValue("callQueueLen", registry);
-  public MetricsTimeVaryingInt authenticationFailures = 
+  public final MetricsTimeVaryingInt authenticationFailures = 
           new MetricsTimeVaryingInt("rpcAuthenticationFailures", registry);
-  public MetricsTimeVaryingInt authenticationSuccesses = 
+  public final MetricsTimeVaryingInt authenticationSuccesses = 
           new MetricsTimeVaryingInt("rpcAuthenticationSuccesses", registry);
-  public MetricsTimeVaryingInt authorizationFailures = 
+  public final MetricsTimeVaryingInt authorizationFailures = 
           new MetricsTimeVaryingInt("rpcAuthorizationFailures", registry);
-  public MetricsTimeVaryingInt authorizationSuccesses = 
+  public final MetricsTimeVaryingInt authorizationSuccesses = 
          new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
   /**
    * Push the metrics to the monitoring subsystem on doUpdate() call.
    */
-  public void doUpdates(MetricsContext context) {
+  public void doUpdates(final MetricsContext context) {
     
     synchronized (this) {
       // ToFix - fix server to use the following two metrics directly so

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=1077234&r1=1077233&r2=1077234&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Mar  4 03:54:30 2011
@@ -33,6 +33,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.spi.NullContext;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -246,7 +249,26 @@ public class TestRPC extends TestCase {
 
     stringResult = proxy.echo((String)null);
     assertEquals(stringResult, null);
-
+    
+    // Check rpcMetrics 
+    server.rpcMetrics.doUpdates(new NullContext());
+    
+    // Number 4 includes getProtocolVersion()
+    assertEquals(4, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
+    assertTrue(server.rpcMetrics.sentBytes.getPreviousIntervalValue() > 0);
+    assertTrue(server.rpcMetrics.receivedBytes.getPreviousIntervalValue() > 0);
+    
+    // Number of calls to echo method should be 2
+    server.rpcDetailedMetrics.doUpdates(new NullContext());
+    MetricsTimeVaryingRate metrics = 
+      (MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("echo");
+    assertEquals(2, metrics.getPreviousIntervalNumOps());
+    
+    // Number of calls to ping method should be 1
+    metrics = 
+      (MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("ping");
+    assertEquals(1, metrics.getPreviousIntervalNumOps());
+    
     String[] stringResults = proxy.echo(new String[]{"foo","bar"});
     assertTrue(Arrays.equals(stringResults, new String[]{"foo","bar"}));