You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2021/07/20 07:05:35 UTC
[hadoop] branch branch-3.3 updated: HADOOP-16290. Enable RpcMetrics
units to be configurable (#3198)
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new ec33119 HADOOP-16290. Enable RpcMetrics units to be configurable (#3198)
ec33119 is described below
commit ec3311975c32034ed4056e2e2154ad77961ac79d
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Tue Jul 20 12:25:49 2021 +0530
HADOOP-16290. Enable RpcMetrics units to be configurable (#3198)
Signed-off-by: Akira Ajisaka <aa...@apache.org>
(cherry picked from commit e1d00addb5b6d7240884536aaa57846af34a0dd5)
---
.../apache/hadoop/fs/CommonConfigurationKeys.java | 4 +-
.../org/apache/hadoop/ipc/DecayRpcScheduler.java | 8 ++-
.../java/org/apache/hadoop/ipc/RpcScheduler.java | 8 +--
.../main/java/org/apache/hadoop/ipc/Server.java | 10 ++--
.../org/apache/hadoop/ipc/metrics/RpcMetrics.java | 38 ++++++++++--
.../src/main/resources/core-default.xml | 15 +++++
.../hadoop-common/src/site/markdown/Metrics.md | 2 +
.../test/java/org/apache/hadoop/ipc/TestRPC.java | 69 +++++++++++++++++++++-
8 files changed, 133 insertions(+), 21 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 76f2a71e..7b5918e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -378,7 +378,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final boolean RPC_METRICS_QUANTILE_ENABLE_DEFAULT = false;
public static final String RPC_METRICS_PERCENTILES_INTERVALS_KEY =
"rpc.metrics.percentiles.intervals";
-
+
+ public static final String RPC_METRICS_TIME_UNIT = "rpc.metrics.timeunit";
+
/** Allowed hosts for nfs exports */
public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index cdd62fe..aa64315 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -178,6 +178,7 @@ public class DecayRpcScheduler implements RpcScheduler,
private final String namespace;
private final int topUsersCount; // e.g., report top 10 users' metrics
private static final double PRECISION = 0.0001;
+ private final TimeUnit metricsTimeUnit;
private MetricsProxy metricsProxy;
private final CostProvider costProvider;
@@ -249,6 +250,8 @@ public class DecayRpcScheduler implements RpcScheduler,
DecayRpcSchedulerDetailedMetrics.create(ns);
decayRpcSchedulerDetailedMetrics.init(numLevels);
+ metricsTimeUnit = RpcMetrics.getMetricsTimeUnit(conf);
+
// Setup delay timer
Timer timer = new Timer(true);
DecayTask task = new DecayTask(this, timer);
@@ -676,8 +679,9 @@ public class DecayRpcScheduler implements RpcScheduler,
addCost(user, processingCost);
int priorityLevel = schedulable.getPriorityLevel();
- long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
- long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
+ long queueTime = details.get(Timing.QUEUE, metricsTimeUnit);
+ long processingTime = details.get(Timing.PROCESSING,
+ metricsTimeUnit);
this.decayRpcSchedulerDetailedMetrics.addQueueTime(
priorityLevel, queueTime);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
index 5202c6b..8c423b8e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
@@ -62,10 +62,10 @@ public interface RpcScheduler {
// this interface, a default implementation is supplied which uses the old
// method. All new implementations MUST override this interface and should
// NOT use the other addResponseTime method.
- int queueTime = (int)
- details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
- int processingTime = (int)
- details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
+ int queueTime = (int) details.get(ProcessingDetails.Timing.QUEUE,
+ RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
+ int processingTime = (int) details.get(ProcessingDetails.Timing.PROCESSING,
+ RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
addResponseTime(callName, schedulable.getPriorityLevel(),
queueTime, processingTime);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index bfdfaf6..00ffea3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -542,13 +542,13 @@ public abstract class Server {
(rpcMetrics.getProcessingStdDev() * deviation);
long processingTime =
- details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
+ details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) {
LOG.warn(
"Slow RPC : {} took {} {} to process from client {},"
+ " the processing detail is {}",
- methodName, processingTime, RpcMetrics.TIMEUNIT, call,
+ methodName, processingTime, rpcMetrics.getMetricsTimeUnit(), call,
details.toString());
rpcMetrics.incrSlowRpc();
}
@@ -568,7 +568,7 @@ public abstract class Server {
deltaNanos -= details.get(Timing.RESPONSE);
details.set(Timing.HANDLER, deltaNanos);
- long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
+ long queueTime = details.get(Timing.QUEUE, rpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcQueueTime(queueTime);
if (call.isResponseDeferred() || connDropped) {
@@ -577,9 +577,9 @@ public abstract class Server {
}
long processingTime =
- details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
+ details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
long waitTime =
- details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
+ details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcLockWaitTime(waitTime);
rpcMetrics.addRpcProcessingTime(processingTime);
// don't include lock wait for detailed metrics.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
index 439b873..a0351ee 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ipc.metrics;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.Server;
@@ -48,9 +49,12 @@ public class RpcMetrics {
final MetricsRegistry registry;
final String name;
final boolean rpcQuantileEnable;
+
+ public static final TimeUnit DEFAULT_METRIC_TIME_UNIT =
+ TimeUnit.MILLISECONDS;
/** The time unit used when storing/accessing time durations. */
- public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS;
-
+ private final TimeUnit metricsTimeUnit;
+
RpcMetrics(Server server, Configuration conf) {
String port = String.valueOf(server.getListenerAddress().getPort());
name = "RpcActivityForPort" + port;
@@ -63,6 +67,7 @@ public class RpcMetrics {
rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE,
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT);
+ metricsTimeUnit = getMetricsTimeUnit(conf);
if (rpcQuantileEnable) {
rpcQueueTimeQuantiles =
new MutableQuantiles[intervals.length];
@@ -75,19 +80,19 @@ public class RpcMetrics {
for (int i = 0; i < intervals.length; i++) {
int interval = intervals[i];
rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
- + interval + "s", "rpc queue time in " + TIMEUNIT, "ops",
+ + interval + "s", "rpc queue time in " + metricsTimeUnit, "ops",
"latency", interval);
rpcLockWaitTimeQuantiles[i] = registry.newQuantiles(
"rpcLockWaitTime" + interval + "s",
- "rpc lock wait time in " + TIMEUNIT, "ops",
+ "rpc lock wait time in " + metricsTimeUnit, "ops",
"latency", interval);
rpcProcessingTimeQuantiles[i] = registry.newQuantiles(
"rpcProcessingTime" + interval + "s",
- "rpc processing time in " + TIMEUNIT, "ops",
+ "rpc processing time in " + metricsTimeUnit, "ops",
"latency", interval);
deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
"deferredRpcProcessingTime" + interval + "s",
- "deferred rpc processing time in " + TIMEUNIT, "ops",
+ "deferred rpc processing time in " + metricsTimeUnit, "ops",
"latency", interval);
}
}
@@ -141,6 +146,27 @@ public class RpcMetrics {
return server.getNumDroppedConnections();
}
+ public TimeUnit getMetricsTimeUnit() {
+ return metricsTimeUnit;
+ }
+
+ public static TimeUnit getMetricsTimeUnit(Configuration conf) {
+ TimeUnit metricsTimeUnit = RpcMetrics.DEFAULT_METRIC_TIME_UNIT;
+ String timeunit = conf.get(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT);
+ if (StringUtils.isNotEmpty(timeunit)) {
+ try {
+ metricsTimeUnit = TimeUnit.valueOf(timeunit);
+ } catch (IllegalArgumentException e) {
+ LOG.info("Config key {} 's value {} does not correspond to enum values"
+ + " of java.util.concurrent.TimeUnit. Hence default unit"
+ + " {} will be used",
+ CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, timeunit,
+ RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
+ }
+ }
+ return metricsTimeUnit;
+ }
+
// Public instrumentation methods that could be extracted to an
// abstract class if we decide to do custom instrumentation classes a la
// JobTrackerInstrumentation. The methods with //@Override comment are
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 43bbe7e..64c3390 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -3294,6 +3294,21 @@
</property>
<property>
+ <name>rpc.metrics.timeunit</name>
+ <value>MILLISECONDS</value>
+ <description>
+ This property is used to configure timeunit for various RPC Metrics
+ e.g rpcQueueTime, rpcLockWaitTime, rpcProcessingTime,
+ deferredRpcProcessingTime. In the absence of this property,
+ default timeunit used is milliseconds.
+ The value of this property should match to any one value of enum:
+ java.util.concurrent.TimeUnit.
+ Some of the valid values: NANOSECONDS, MICROSECONDS, MILLISECONDS,
+ SECONDS etc.
+ </description>
+</property>
+
+<property>
<name>rpc.metrics.percentiles.intervals</name>
<value></value>
<description>
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 4a078d7..b4d4503 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -65,6 +65,8 @@ rpc
---
Each metrics record contains tags such as Hostname and port (number to which server is bound) as additional information along with metrics.
+`rpc.metrics.timeunit` config can be used to configure timeunit for RPC metrics.
+The default timeunit used for RPC metrics is milliseconds (as per the below description).
| Name | Description |
|:---- |:---- |
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 9fbb865..b78900b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.Server.Connection;
-import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.TestProtos;
@@ -1098,8 +1097,8 @@ public class TestRPC extends TestRpcBase {
proxy.lockAndSleep(null, newSleepRequest(5));
rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertGauge("RpcLockWaitTimeAvgTime",
- (double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
- rpcMetrics);
+ (double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
+ TimeUnit.SECONDS)), rpcMetrics);
} finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);
@@ -1603,6 +1602,70 @@ public class TestRPC extends TestRpcBase {
assertTrue(rpcEngine instanceof StoppedRpcEngine);
}
+ @Test
+ public void testRpcMetricsInNanos() throws Exception {
+ final Server server;
+ TestRpcService proxy = null;
+
+ final int interval = 1;
+ conf.setBoolean(CommonConfigurationKeys.
+ RPC_METRICS_QUANTILE_ENABLE, true);
+ conf.set(CommonConfigurationKeys.
+ RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
+ conf.set(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, "NANOSECONDS");
+
+ server = setupTestServer(conf, 5);
+ String testUser = "testUserInNanos";
+ UserGroupInformation anotherUser =
+ UserGroupInformation.createRemoteUser(testUser);
+ TestRpcService proxy2 =
+ anotherUser.doAs((PrivilegedAction<TestRpcService>) () -> {
+ try {
+ return RPC.getProxy(TestRpcService.class, 0,
+ server.getListenerAddress(), conf);
+ } catch (IOException e) {
+ LOG.error("Something went wrong.", e);
+ }
+ return null;
+ });
+ try {
+ proxy = getClient(addr, conf);
+ for (int i = 0; i < 100; i++) {
+ proxy.ping(null, newEmptyRequest());
+ proxy.echo(null, newEchoRequest("" + i));
+ proxy2.echo(null, newEchoRequest("" + i));
+ }
+ MetricsRecordBuilder rpcMetrics =
+ getMetrics(server.getRpcMetrics().name());
+ assertEquals("Expected zero rpc lock wait time",
+ 0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
+ MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
+ rpcMetrics);
+ MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
+ rpcMetrics);
+
+ proxy.lockAndSleep(null, newSleepRequest(5));
+ rpcMetrics = getMetrics(server.getRpcMetrics().name());
+ assertGauge("RpcLockWaitTimeAvgTime",
+ (double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
+ TimeUnit.SECONDS)), rpcMetrics);
+ LOG.info("RpcProcessingTimeAvgTime: {} , RpcQueueTimeAvgTime: {}",
+ getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics),
+ getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics));
+
+ assertTrue(getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics)
+ > 4000000D);
+ assertTrue(getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics)
+ > 4000D);
+ } finally {
+ if (proxy2 != null) {
+ RPC.stopProxy(proxy2);
+ }
+ stop(server, proxy);
+ }
+ }
+
+
public static void main(String[] args) throws Exception {
new TestRPC().testCallsInternal(conf);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org