You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:54:31 UTC
svn commit: r1077234 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
core/org/apache/hadoop/ipc/ core/org/apache/hadoop/ipc/metrics/
test/org/apache/hadoop/ipc/
Author: omalley
Date: Fri Mar 4 03:54:30 2011
New Revision: 1077234
URL: http://svn.apache.org/viewvc?rev=1077234&view=rev
Log:
commit f20e8090dda082e7c6fa03b008ee04da2435ad42
Author: Suresh Srinivas <su...@yahoo-inc.com>
Date: Fri Feb 26 16:50:29 2010 -0800
HDFS-6599 from https://issues.apache.org/jira/secure/attachment/12437251/hadoop-6599.rel20.patch
+++ b/YAHOO-CHANGES.txt
+ HADOOP-6599. Split existing RpcMetrics with summary in RpcMetrics and
+ details information in RpcDetailedMetrics. (suresh)
Added:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcMetrics.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java?rev=1077234&r1=1077233&r2=1077234&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/RPC.java Fri Mar 4 03:54:30 2011
@@ -528,15 +528,15 @@ public class RPC {
rpcMetrics.rpcProcessingTime.inc(processingTime);
MetricsTimeVaryingRate m =
- (MetricsTimeVaryingRate) rpcMetrics.registry.get(call.getMethodName());
+ (MetricsTimeVaryingRate) rpcDetailedMetrics.registry.get(call.getMethodName());
if (m == null) {
try {
m = new MetricsTimeVaryingRate(call.getMethodName(),
- rpcMetrics.registry);
+ rpcDetailedMetrics.registry);
} catch (IllegalArgumentException iae) {
// the metrics has been registered; re-fetch the handle
LOG.info("Error register " + call.getMethodName(), iae);
- m = (MetricsTimeVaryingRate) rpcMetrics.registry.get(
+ m = (MetricsTimeVaryingRate) rpcDetailedMetrics.registry.get(
call.getMethodName());
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java?rev=1077234&r1=1077233&r2=1077234&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java Fri Mar 4 03:54:30 2011
@@ -61,6 +61,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslRpcServer;
@@ -171,7 +172,8 @@ public abstract class Server {
// connections to nuke
//during a cleanup
- protected RpcMetrics rpcMetrics;
+ protected RpcMetrics rpcMetrics;
+ protected RpcDetailedMetrics rpcDetailedMetrics;
private Configuration conf;
private SecretManager<TokenIdentifier> secretManager;
@@ -1302,6 +1304,8 @@ public abstract class Server {
this.port = listener.getAddress().getPort();
this.rpcMetrics = new RpcMetrics(serverName,
Integer.toString(this.port), this);
+ this.rpcDetailedMetrics = new RpcDetailedMetrics(serverName,
+ Integer.toString(this.port));
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
@@ -1411,6 +1415,9 @@ public abstract class Server {
if (this.rpcMetrics != null) {
this.rpcMetrics.shutdown();
}
+ if (this.rpcDetailedMetrics != null) {
+ this.rpcDetailedMetrics.shutdown();
+ }
}
/** Wait for the server to be stopped.
@@ -1501,11 +1508,15 @@ public abstract class Server {
*
* @see WritableByteChannel#write(ByteBuffer)
*/
- private static int channelWrite(WritableByteChannel channel,
- ByteBuffer buffer) throws IOException {
+ private int channelWrite(WritableByteChannel channel,
+ ByteBuffer buffer) throws IOException {
- return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
- channel.write(buffer) : channelIO(null, channel, buffer);
+ int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+ channel.write(buffer) : channelIO(null, channel, buffer);
+ if (count > 0) {
+ rpcMetrics.sentBytes.inc(count);
+ }
+ return count;
}
@@ -1517,11 +1528,15 @@ public abstract class Server {
*
* @see ReadableByteChannel#read(ByteBuffer)
*/
- private static int channelRead(ReadableByteChannel channel,
- ByteBuffer buffer) throws IOException {
+ private int channelRead(ReadableByteChannel channel,
+ ByteBuffer buffer) throws IOException {
- return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
- channel.read(buffer) : channelIO(channel, null, buffer);
+ int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+ channel.read(buffer) : channelIO(channel, null, buffer);
+ if (count > 0) {
+ rpcMetrics.receivedBytes.inc(count);
+ }
+ return count;
}
/**
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java?rev=1077234&r1=1077233&r2=1077234&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java Fri Mar 4 03:54:30 2011
@@ -55,7 +55,7 @@ import org.apache.hadoop.metrics.util.Me
*/
public class RpcActivityMBean extends MetricsDynamicMBeanBase {
- final private ObjectName mbeanName;
+ private final ObjectName mbeanName;
/**
*
@@ -63,9 +63,8 @@ public class RpcActivityMBean extends Me
* @param serviceName - the service name for the rpc service
* @param port - the rpc port.
*/
- public RpcActivityMBean(final MetricsRegistry mr, final String serviceName, final String port) {
-
-
+ public RpcActivityMBean(final MetricsRegistry mr, final String serviceName,
+ final String port) {
super(mr, "Rpc layer statistics");
mbeanName = MBeanUtil.registerMBean(serviceName,
"RpcActivityForPort" + port, this);
@@ -76,5 +75,4 @@ public class RpcActivityMBean extends Me
if (mbeanName != null)
MBeanUtil.unregisterMBean(mbeanName);
}
-
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java?rev=1077234&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java Fri Mar 4 03:54:30 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+import javax.management.ObjectName;
+
+import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+/**
+ *
+ * This is the JMX MBean for reporting the RPC layer Activity. The MBean is
+ * register using the name
+ * "hadoop:service=<RpcServiceName>,name=RpcDetailedActivityForPort<port>"
+ *
+ * Many of the activity metrics are sampled and averaged on an interval which
+ * can be specified in the metrics config file.
+ * <p>
+ * For the metrics that are sampled and averaged, one must specify a metrics
+ * context that does periodic update calls. Most metrics contexts do. The
+ * default Null metrics context however does NOT. So if you aren't using any
+ * other metrics context then you can turn on the viewing and averaging of
+ * sampled metrics by specifying the following two lines in the
+ * hadoop-meterics.properties file:
+ *
+ * <pre>
+ * rpc.class=org.apache.hadoop.metrics.spi.NullContextWithUpdateThread
+ * rpc.period=10
+ * </pre>
+ *<p>
+ * Note that the metrics are collected regardless of the context used. The
+ * context with the update thread is used to average the data periodically
+ *
+ * Impl details: We use a dynamic mbean that gets the list of the metrics from
+ * the metrics registry passed as an argument to the constructor
+ */
+public class RpcDetailedActivityMBean extends MetricsDynamicMBeanBase {
+ private final ObjectName mbeanName;
+
+ /**
+ * @param mr - the metrics registry that has all the metrics
+ * @param serviceName - the service name for the rpc service
+ * @param port - the rpc port.
+ */
+ public RpcDetailedActivityMBean(final MetricsRegistry mr,
+ final String serviceName, final String port) {
+ super(mr, "Rpc layer detailed statistics");
+ mbeanName = MBeanUtil.registerMBean(serviceName,
+ "RpcDetailedActivityForPort" + port, this);
+ }
+
+ public void shutdown() {
+ if (mbeanName != null)
+ MBeanUtil.unregisterMBean(mbeanName);
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java?rev=1077234&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java Fri Mar 4 03:54:30 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.util.MetricsBase;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+
+/**
+ *
+ * This class is for maintaining the various RPC method related statistics
+ * and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ */
+public class RpcDetailedMetrics implements Updater {
+ public final MetricsRegistry registry = new MetricsRegistry();
+ private final MetricsRecord metricsRecord;
+ private static final Log LOG = LogFactory.getLog(RpcDetailedMetrics.class);
+ RpcDetailedActivityMBean rpcMBean;
+
+ /**
+ * Statically added metrics to expose at least one metrics, without
+ * which other dynamically added metrics are not exposed over JMX.
+ */
+ final MetricsTimeVaryingRate getProtocolVersion =
+ new MetricsTimeVaryingRate("getProtocolVersion", registry);
+
+ public RpcDetailedMetrics(final String hostName, final String port) {
+ MetricsContext context = MetricsUtil.getContext("rpc");
+ metricsRecord = MetricsUtil.createRecord(context, "detailed-metrics");
+
+ metricsRecord.setTag("port", port);
+
+ LOG.info("Initializing RPC Metrics with hostName="
+ + hostName + ", port=" + port);
+
+ context.registerUpdater(this);
+
+ // Need to clean up the interface to RpcMgt - don't need both metrics and server params
+ rpcMBean = new RpcDetailedActivityMBean(registry, hostName, port);
+ }
+
+
+ /**
+ * Push the metrics to the monitoring subsystem on doUpdate() call.
+ */
+ public void doUpdates(final MetricsContext context) {
+
+ synchronized (this) {
+ for (MetricsBase m : registry.getMetricsList()) {
+ m.pushMetric(metricsRecord);
+ }
+ }
+ metricsRecord.update();
+ }
+
+ public void shutdown() {
+ if (rpcMBean != null)
+ rpcMBean.shutdown();
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcMetrics.java?rev=1077234&r1=1077233&r2=1077234&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcMetrics.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/metrics/RpcMetrics.java Fri Mar 4 03:54:30 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.metrics.util.Me
import org.apache.hadoop.metrics.util.MetricsIntValue;
import org.apache.hadoop.metrics.util.MetricsRegistry;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
/**
@@ -43,13 +44,14 @@ import org.apache.hadoop.metrics.util.Me
*
*/
public class RpcMetrics implements Updater {
- public MetricsRegistry registry = new MetricsRegistry();
- private MetricsRecord metricsRecord;
- private Server myServer;
- private static Log LOG = LogFactory.getLog(RpcMetrics.class);
+ private final MetricsRegistry registry = new MetricsRegistry();
+ private final MetricsRecord metricsRecord;
+ private final Server myServer;
+ private static final Log LOG = LogFactory.getLog(RpcMetrics.class);
RpcActivityMBean rpcMBean;
- public RpcMetrics(String hostName, String port, Server server) {
+ public RpcMetrics(final String hostName, final String port,
+ final Server server) {
myServer = server;
MetricsContext context = MetricsUtil.getContext("rpc");
metricsRecord = MetricsUtil.createRecord(context, "metrics");
@@ -72,26 +74,30 @@ public class RpcMetrics implements Updat
* -they can also be read directly - e.g. JMX does this.
*/
- public MetricsTimeVaryingRate rpcQueueTime =
+ public final MetricsTimeVaryingLong receivedBytes =
+ new MetricsTimeVaryingLong("ReceivedBytes", registry);
+ public final MetricsTimeVaryingLong sentBytes =
+ new MetricsTimeVaryingLong("SentBytes", registry);
+ public final MetricsTimeVaryingRate rpcQueueTime =
new MetricsTimeVaryingRate("RpcQueueTime", registry);
public MetricsTimeVaryingRate rpcProcessingTime =
new MetricsTimeVaryingRate("RpcProcessingTime", registry);
- public MetricsIntValue numOpenConnections =
+ public final MetricsIntValue numOpenConnections =
new MetricsIntValue("NumOpenConnections", registry);
- public MetricsIntValue callQueueLen =
+ public final MetricsIntValue callQueueLen =
new MetricsIntValue("callQueueLen", registry);
- public MetricsTimeVaryingInt authenticationFailures =
+ public final MetricsTimeVaryingInt authenticationFailures =
new MetricsTimeVaryingInt("rpcAuthenticationFailures", registry);
- public MetricsTimeVaryingInt authenticationSuccesses =
+ public final MetricsTimeVaryingInt authenticationSuccesses =
new MetricsTimeVaryingInt("rpcAuthenticationSuccesses", registry);
- public MetricsTimeVaryingInt authorizationFailures =
+ public final MetricsTimeVaryingInt authorizationFailures =
new MetricsTimeVaryingInt("rpcAuthorizationFailures", registry);
- public MetricsTimeVaryingInt authorizationSuccesses =
+ public final MetricsTimeVaryingInt authorizationSuccesses =
new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
/**
* Push the metrics to the monitoring subsystem on doUpdate() call.
*/
- public void doUpdates(MetricsContext context) {
+ public void doUpdates(final MetricsContext context) {
synchronized (this) {
// ToFix - fix server to use the following two metrics directly so
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=1077234&r1=1077233&r2=1077234&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Mar 4 03:54:30 2011
@@ -33,6 +33,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.spi.NullContext;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -246,7 +249,26 @@ public class TestRPC extends TestCase {
stringResult = proxy.echo((String)null);
assertEquals(stringResult, null);
-
+
+ // Check rpcMetrics
+ server.rpcMetrics.doUpdates(new NullContext());
+
+ // Number 4 includes getProtocolVersion()
+ assertEquals(4, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
+ assertTrue(server.rpcMetrics.sentBytes.getPreviousIntervalValue() > 0);
+ assertTrue(server.rpcMetrics.receivedBytes.getPreviousIntervalValue() > 0);
+
+ // Number of calls to echo method should be 2
+ server.rpcDetailedMetrics.doUpdates(new NullContext());
+ MetricsTimeVaryingRate metrics =
+ (MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("echo");
+ assertEquals(2, metrics.getPreviousIntervalNumOps());
+
+ // Number of calls to ping method should be 1
+ metrics =
+ (MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("ping");
+ assertEquals(1, metrics.getPreviousIntervalNumOps());
+
String[] stringResults = proxy.echo(new String[]{"foo","bar"});
assertTrue(Arrays.equals(stringResults, new String[]{"foo","bar"}));