You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2021/07/20 07:05:35 UTC

[hadoop] branch branch-3.3 updated: HADOOP-16290. Enable RpcMetrics units to be configurable (#3198)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new ec33119  HADOOP-16290. Enable RpcMetrics units to be configurable (#3198)
ec33119 is described below

commit ec3311975c32034ed4056e2e2154ad77961ac79d
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Tue Jul 20 12:25:49 2021 +0530

    HADOOP-16290. Enable RpcMetrics units to be configurable (#3198)
    
    Signed-off-by: Akira Ajisaka <aa...@apache.org>
    (cherry picked from commit e1d00addb5b6d7240884536aaa57846af34a0dd5)
---
 .../apache/hadoop/fs/CommonConfigurationKeys.java  |  4 +-
 .../org/apache/hadoop/ipc/DecayRpcScheduler.java   |  8 ++-
 .../java/org/apache/hadoop/ipc/RpcScheduler.java   |  8 +--
 .../main/java/org/apache/hadoop/ipc/Server.java    | 10 ++--
 .../org/apache/hadoop/ipc/metrics/RpcMetrics.java  | 38 ++++++++++--
 .../src/main/resources/core-default.xml            | 15 +++++
 .../hadoop-common/src/site/markdown/Metrics.md     |  2 +
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   | 69 +++++++++++++++++++++-
 8 files changed, 133 insertions(+), 21 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 76f2a71e..7b5918e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -378,7 +378,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final boolean RPC_METRICS_QUANTILE_ENABLE_DEFAULT = false;
   public static final String  RPC_METRICS_PERCENTILES_INTERVALS_KEY =
       "rpc.metrics.percentiles.intervals";
-  
+
+  public static final String RPC_METRICS_TIME_UNIT = "rpc.metrics.timeunit";
+
   /** Allowed hosts for nfs exports */
   public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
   public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index cdd62fe..aa64315 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -178,6 +178,7 @@ public class DecayRpcScheduler implements RpcScheduler,
   private final String namespace;
   private final int topUsersCount; // e.g., report top 10 users' metrics
   private static final double PRECISION = 0.0001;
+  private final TimeUnit metricsTimeUnit;
   private MetricsProxy metricsProxy;
   private final CostProvider costProvider;
 
@@ -249,6 +250,8 @@ public class DecayRpcScheduler implements RpcScheduler,
         DecayRpcSchedulerDetailedMetrics.create(ns);
     decayRpcSchedulerDetailedMetrics.init(numLevels);
 
+    metricsTimeUnit = RpcMetrics.getMetricsTimeUnit(conf);
+
     // Setup delay timer
     Timer timer = new Timer(true);
     DecayTask task = new DecayTask(this, timer);
@@ -676,8 +679,9 @@ public class DecayRpcScheduler implements RpcScheduler,
     addCost(user, processingCost);
 
     int priorityLevel = schedulable.getPriorityLevel();
-    long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
-    long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
+    long queueTime = details.get(Timing.QUEUE, metricsTimeUnit);
+    long processingTime = details.get(Timing.PROCESSING,
+        metricsTimeUnit);
 
     this.decayRpcSchedulerDetailedMetrics.addQueueTime(
         priorityLevel, queueTime);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
index 5202c6b..8c423b8e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
@@ -62,10 +62,10 @@ public interface RpcScheduler {
     // this interface, a default implementation is supplied which uses the old
     // method. All new implementations MUST override this interface and should
     // NOT use the other addResponseTime method.
-    int queueTime = (int)
-        details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
-    int processingTime = (int)
-        details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
+    int queueTime = (int) details.get(ProcessingDetails.Timing.QUEUE,
+        RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
+    int processingTime = (int) details.get(ProcessingDetails.Timing.PROCESSING,
+        RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
     addResponseTime(callName, schedulable.getPriorityLevel(),
         queueTime, processingTime);
   }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index bfdfaf6..00ffea3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -542,13 +542,13 @@ public abstract class Server {
         (rpcMetrics.getProcessingStdDev() * deviation);
 
     long processingTime =
-            details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
+            details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
     if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
         (processingTime > threeSigma)) {
       LOG.warn(
           "Slow RPC : {} took {} {} to process from client {},"
               + " the processing detail is {}",
-          methodName, processingTime, RpcMetrics.TIMEUNIT, call,
+          methodName, processingTime, rpcMetrics.getMetricsTimeUnit(), call,
           details.toString());
       rpcMetrics.incrSlowRpc();
     }
@@ -568,7 +568,7 @@ public abstract class Server {
     deltaNanos -= details.get(Timing.RESPONSE);
     details.set(Timing.HANDLER, deltaNanos);
 
-    long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
+    long queueTime = details.get(Timing.QUEUE, rpcMetrics.getMetricsTimeUnit());
     rpcMetrics.addRpcQueueTime(queueTime);
 
     if (call.isResponseDeferred() || connDropped) {
@@ -577,9 +577,9 @@ public abstract class Server {
     }
 
     long processingTime =
-        details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
+        details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
     long waitTime =
-        details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
+        details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
     rpcMetrics.addRpcLockWaitTime(waitTime);
     rpcMetrics.addRpcProcessingTime(processingTime);
     // don't include lock wait for detailed metrics.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
index 439b873..a0351ee 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ipc.metrics;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.Server;
@@ -48,9 +49,12 @@ public class RpcMetrics {
   final MetricsRegistry registry;
   final String name;
   final boolean rpcQuantileEnable;
+
+  public static final TimeUnit DEFAULT_METRIC_TIME_UNIT =
+      TimeUnit.MILLISECONDS;
   /** The time unit used when storing/accessing time durations. */
-  public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS;
-  
+  private final TimeUnit metricsTimeUnit;
+
   RpcMetrics(Server server, Configuration conf) {
     String port = String.valueOf(server.getListenerAddress().getPort());
     name = "RpcActivityForPort" + port;
@@ -63,6 +67,7 @@ public class RpcMetrics {
     rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
         CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE,
         CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT);
+    metricsTimeUnit = getMetricsTimeUnit(conf);
     if (rpcQuantileEnable) {
       rpcQueueTimeQuantiles =
           new MutableQuantiles[intervals.length];
@@ -75,19 +80,19 @@ public class RpcMetrics {
       for (int i = 0; i < intervals.length; i++) {
         int interval = intervals[i];
         rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
-            + interval + "s", "rpc queue time in " + TIMEUNIT, "ops",
+            + interval + "s", "rpc queue time in " + metricsTimeUnit, "ops",
             "latency", interval);
         rpcLockWaitTimeQuantiles[i] = registry.newQuantiles(
             "rpcLockWaitTime" + interval + "s",
-            "rpc lock wait time in " + TIMEUNIT, "ops",
+            "rpc lock wait time in " + metricsTimeUnit, "ops",
             "latency", interval);
         rpcProcessingTimeQuantiles[i] = registry.newQuantiles(
             "rpcProcessingTime" + interval + "s",
-            "rpc processing time in " + TIMEUNIT, "ops",
+            "rpc processing time in " + metricsTimeUnit, "ops",
             "latency", interval);
         deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
             "deferredRpcProcessingTime" + interval + "s",
-            "deferred rpc processing time in " + TIMEUNIT, "ops",
+            "deferred rpc processing time in " + metricsTimeUnit, "ops",
             "latency", interval);
       }
     }
@@ -141,6 +146,27 @@ public class RpcMetrics {
     return server.getNumDroppedConnections();
   }
 
+  public TimeUnit getMetricsTimeUnit() {
+    return metricsTimeUnit;
+  }
+
+  public static TimeUnit getMetricsTimeUnit(Configuration conf) {
+    TimeUnit metricsTimeUnit = RpcMetrics.DEFAULT_METRIC_TIME_UNIT;
+    String timeunit = conf.get(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT);
+    if (StringUtils.isNotEmpty(timeunit)) {
+      try {
+        metricsTimeUnit = TimeUnit.valueOf(timeunit);
+      } catch (IllegalArgumentException e) {
+        LOG.info("Config key {} 's value {} does not correspond to enum values"
+                + " of java.util.concurrent.TimeUnit. Hence default unit"
+                + " {} will be used",
+            CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, timeunit,
+            RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
+      }
+    }
+    return metricsTimeUnit;
+  }
+
   // Public instrumentation methods that could be extracted to an
   // abstract class if we decide to do custom instrumentation classes a la
   // JobTrackerInstrumentation. The methods with //@Override comment are
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 43bbe7e..64c3390 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -3294,6 +3294,21 @@
 </property>
 
 <property>
+  <name>rpc.metrics.timeunit</name>
+  <value>MILLISECONDS</value>
+  <description>
+    This property is used to configure timeunit for various RPC Metrics
+    e.g rpcQueueTime, rpcLockWaitTime, rpcProcessingTime,
+    deferredRpcProcessingTime. In the absence of this property,
+    default timeunit used is milliseconds.
+    The value of this property should match to any one value of enum:
+    java.util.concurrent.TimeUnit.
+    Some of the valid values: NANOSECONDS, MICROSECONDS, MILLISECONDS,
+    SECONDS etc.
+  </description>
+</property>
+
+<property>
   <name>rpc.metrics.percentiles.intervals</name>
   <value></value>
   <description>
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 4a078d7..b4d4503 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -65,6 +65,8 @@ rpc
 ---
 
 Each metrics record contains tags such as Hostname and port (number to which server is bound) as additional information along with metrics.
+`rpc.metrics.timeunit` config can be used to configure timeunit for RPC metrics.
+The default timeunit used for RPC metrics is milliseconds (as per the below description).
 
 | Name | Description |
 |:---- |:---- |
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 9fbb865..b78900b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.ipc.Server.Connection;
-import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos;
@@ -1098,8 +1097,8 @@ public class TestRPC extends TestRpcBase {
       proxy.lockAndSleep(null, newSleepRequest(5));
       rpcMetrics = getMetrics(server.getRpcMetrics().name());
       assertGauge("RpcLockWaitTimeAvgTime",
-          (double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
-          rpcMetrics);
+          (double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
+              TimeUnit.SECONDS)), rpcMetrics);
     } finally {
       if (proxy2 != null) {
         RPC.stopProxy(proxy2);
@@ -1603,6 +1602,70 @@ public class TestRPC extends TestRpcBase {
     assertTrue(rpcEngine instanceof StoppedRpcEngine);
   }
 
+  @Test
+  public void testRpcMetricsInNanos() throws Exception {
+    final Server server;
+    TestRpcService proxy = null;
+
+    final int interval = 1;
+    conf.setBoolean(CommonConfigurationKeys.
+        RPC_METRICS_QUANTILE_ENABLE, true);
+    conf.set(CommonConfigurationKeys.
+        RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
+    conf.set(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, "NANOSECONDS");
+
+    server = setupTestServer(conf, 5);
+    String testUser = "testUserInNanos";
+    UserGroupInformation anotherUser =
+        UserGroupInformation.createRemoteUser(testUser);
+    TestRpcService proxy2 =
+        anotherUser.doAs((PrivilegedAction<TestRpcService>) () -> {
+          try {
+            return RPC.getProxy(TestRpcService.class, 0,
+                server.getListenerAddress(), conf);
+          } catch (IOException e) {
+            LOG.error("Something went wrong.", e);
+          }
+          return null;
+        });
+    try {
+      proxy = getClient(addr, conf);
+      for (int i = 0; i < 100; i++) {
+        proxy.ping(null, newEmptyRequest());
+        proxy.echo(null, newEchoRequest("" + i));
+        proxy2.echo(null, newEchoRequest("" + i));
+      }
+      MetricsRecordBuilder rpcMetrics =
+          getMetrics(server.getRpcMetrics().name());
+      assertEquals("Expected zero rpc lock wait time",
+          0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
+      MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
+          rpcMetrics);
+      MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
+          rpcMetrics);
+
+      proxy.lockAndSleep(null, newSleepRequest(5));
+      rpcMetrics = getMetrics(server.getRpcMetrics().name());
+      assertGauge("RpcLockWaitTimeAvgTime",
+          (double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
+              TimeUnit.SECONDS)), rpcMetrics);
+      LOG.info("RpcProcessingTimeAvgTime: {} , RpcQueueTimeAvgTime: {}",
+          getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics),
+          getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics));
+
+      assertTrue(getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics)
+          > 4000000D);
+      assertTrue(getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics)
+          > 4000D);
+    } finally {
+      if (proxy2 != null) {
+        RPC.stopProxy(proxy2);
+      }
+      stop(server, proxy);
+    }
+  }
+
+
   public static void main(String[] args) throws Exception {
     new TestRPC().testCallsInternal(conf);
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org