You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2022/06/18 04:17:27 UTC

[hadoop] branch trunk updated: HADOOP-18288. Total requests and total requests per sec served by RPC servers (#4431)

This is an automated email from the ASF dual-hosted git repository.

tomscut pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e38e13be03d HADOOP-18288. Total requests and total requests per sec served by RPC servers (#4431)
e38e13be03d is described below

commit e38e13be03d581574f66c50b56a0918844849e17
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri Jun 17 21:17:20 2022 -0700

    HADOOP-18288. Total requests and total requests per sec served by RPC servers (#4431)
    
    Reviewed-by: Steve Loughran <st...@apache.org>
    Signed-off-by: Tao Li <to...@apache.org>
---
 .../hadoop/fs/CommonConfigurationKeysPublic.java   |  8 +++
 .../main/java/org/apache/hadoop/ipc/Server.java    | 70 ++++++++++++++++++++++
 .../org/apache/hadoop/ipc/metrics/RpcMetrics.java  | 10 ++++
 .../hadoop-common/src/site/markdown/Metrics.md     |  2 +
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   | 58 ++++++++++++++++++
 .../hdfs/qjournal/server/TestJournalNodeSync.java  |  4 ++
 .../namenode/ha/TestConsistentReadsObserver.java   |  3 +
 7 files changed, 155 insertions(+)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index 52252365092..7a458e8f3fc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -1054,5 +1054,13 @@ public class CommonConfigurationKeysPublic {
   public static final String HADOOP_HTTP_IDLE_TIMEOUT_MS_KEY =
       "hadoop.http.idle_timeout.ms";
   public static final int HADOOP_HTTP_IDLE_TIMEOUT_MS_DEFAULT = 60000;
+
+  /**
+   * To configure scheduling of server metrics update thread. This config is used to indicate
+   * initial delay and delay between each execution of the metric update runnable thread.
+   */
+  public static final String IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL =
+      "ipc.server.metrics.update.runner.interval";
+  public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000;
 }
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 90f730d3883..e79612f7a5a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -65,9 +65,12 @@ import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
 
 import javax.security.sasl.Sasl;
@@ -127,6 +130,8 @@ import org.apache.hadoop.tracing.Tracer;
 import org.apache.hadoop.tracing.TraceUtils;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.classification.VisibleForTesting;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.thirdparty.protobuf.ByteString;
 import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream;
 import org.apache.hadoop.thirdparty.protobuf.Message;
@@ -500,6 +505,11 @@ public abstract class Server {
   private Responder responder = null;
   private Handler[] handlers = null;
   private final AtomicInteger numInProcessHandler = new AtomicInteger();
+  private final LongAdder totalRequests = new LongAdder();
+  private long lastSeenTotalRequests = 0;
+  private long totalRequestsPerSecond = 0;
+  private final long metricsUpdaterInterval;
+  private final ScheduledExecutorService scheduledExecutorService;
 
   private boolean logSlowRPC = false;
 
@@ -515,6 +525,14 @@ public abstract class Server {
     return numInProcessHandler.get();
   }
 
+  public long getTotalRequests() {
+    return totalRequests.sum();
+  }
+
+  public long getTotalRequestsPerSecond() {
+    return totalRequestsPerSecond;
+  }
+
   /**
    * Sets slow RPC flag.
    * @param logSlowRPCFlag input logSlowRPCFlag.
@@ -578,6 +596,7 @@ public abstract class Server {
   }
 
   void updateMetrics(Call call, long startTime, boolean connDropped) {
+    totalRequests.increment();
     // delta = handler + processing + response
     long deltaNanos = Time.monotonicNowNanos() - startTime;
     long timestampNanos = call.timestampNanos;
@@ -3304,6 +3323,14 @@ public abstract class Server {
     this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
     this.exceptionsHandler.addTerseLoggingExceptions(
         HealthCheckFailedException.class);
+    this.metricsUpdaterInterval =
+        conf.getLong(CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL,
+            CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT);
+    this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Hadoop-Metrics-Updater-%d")
+            .build());
+    this.scheduledExecutorService.scheduleWithFixedDelay(new MetricsUpdateRunner(),
+        metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS);
   }
 
   public synchronized void addAuxiliaryListener(int auxiliaryPort)
@@ -3598,10 +3625,25 @@ public abstract class Server {
     }
     responder.interrupt();
     notifyAll();
+    shutdownMetricsUpdaterExecutor();
     this.rpcMetrics.shutdown();
     this.rpcDetailedMetrics.shutdown();
   }
 
+  private void shutdownMetricsUpdaterExecutor() {
+    this.scheduledExecutorService.shutdown();
+    try {
+      boolean isExecutorShutdown =
+          this.scheduledExecutorService.awaitTermination(3, TimeUnit.SECONDS);
+      if (!isExecutorShutdown) {
+        LOG.info("Hadoop Metrics Updater executor could not be shutdown.");
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.info("Hadoop Metrics Updater executor shutdown interrupted.", e);
+    }
+  }
+
   /**
    * Wait for the server to be stopped.
    * Does not wait for all subthreads to finish.
@@ -4061,4 +4103,32 @@ public abstract class Server {
   public String getServerName() {
     return serverName;
   }
+
+  /**
+   * Server metrics updater thread, used to update some metrics on a regular basis.
+   * For instance, requests per second.
+   */
+  private class MetricsUpdateRunner implements Runnable {
+
+    private long lastExecuted = 0;
+
+    @Override
+    public synchronized void run() {
+      long currentTime = Time.monotonicNow();
+      if (lastExecuted == 0) {
+        lastExecuted = currentTime - metricsUpdaterInterval;
+      }
+      long currentTotalRequests = totalRequests.sum();
+      long totalRequestsDiff = currentTotalRequests - lastSeenTotalRequests;
+      lastSeenTotalRequests = currentTotalRequests;
+      if ((currentTime - lastExecuted) > 0) {
+        double totalRequestsPerSecInDouble =
+            (double) totalRequestsDiff / TimeUnit.MILLISECONDS.toSeconds(
+                currentTime - lastExecuted);
+        totalRequestsPerSecond = ((long) totalRequestsPerSecInDouble);
+      }
+      lastExecuted = currentTime;
+    }
+  }
+
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
index a67530b3c97..bf21e3865fa 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
@@ -151,6 +151,16 @@ public class RpcMetrics {
     return server.getNumDroppedConnections();
   }
 
+  @Metric("Number of total requests")
+  public long getTotalRequests() {
+    return server.getTotalRequests();
+  }
+
+  @Metric("Number of total requests per second")
+  public long getTotalRequestsPerSecond() {
+    return server.getTotalRequestsPerSecond();
+  }
+
   public TimeUnit getMetricsTimeUnit() {
     return metricsTimeUnit;
   }
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 47786e473a5..04cbd9fedf8 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -104,6 +104,8 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
 | `rpcLockWaitTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
 | `rpcLockWaitTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
 | `rpcLockWaitTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
+| `TotalRequests` | Total num of requests served by the RPC server. |
+| `TotalRequestsPerSeconds` | Total num of requests per second served by the RPC server. |
 
 RetryCache/NameNodeRetryCache
 -----------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 5fc9cb5410b..5caabd22a88 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.ipc;
 
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
@@ -84,6 +86,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -1697,6 +1700,61 @@ public class TestRPC extends TestRpcBase {
     }
   }
 
+  @Test
+  public void testNumTotalRequestsMetrics() throws Exception {
+    UserGroupInformation ugi = UserGroupInformation.
+        createUserForTesting("userXyz", new String[0]);
+
+    final Server server = setupTestServer(conf, 1);
+
+    ExecutorService executorService = null;
+    try {
+      RpcMetrics rpcMetrics = server.getRpcMetrics();
+      assertEquals(0, rpcMetrics.getTotalRequests());
+      assertEquals(0, rpcMetrics.getTotalRequestsPerSecond());
+
+      List<ExternalCall<Void>> externalCallList = new ArrayList<>();
+
+      executorService = Executors.newSingleThreadExecutor(
+          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("testNumTotalRequestsMetrics")
+              .build());
+      AtomicInteger rps = new AtomicInteger(0);
+      CountDownLatch countDownLatch = new CountDownLatch(1);
+      executorService.submit(() -> {
+        while (true) {
+          int numRps = (int) rpcMetrics.getTotalRequestsPerSecond();
+          rps.getAndSet(numRps);
+          if (rps.get() > 0) {
+            countDownLatch.countDown();
+            break;
+          }
+        }
+      });
+
+      for (int i = 0; i < 100000; i++) {
+        externalCallList.add(newExtCall(ugi, () -> null));
+      }
+      for (ExternalCall<Void> externalCall : externalCallList) {
+        server.queueCall(externalCall);
+      }
+      for (ExternalCall<Void> externalCall : externalCallList) {
+        externalCall.get();
+      }
+
+      assertEquals(100000, rpcMetrics.getTotalRequests());
+      if (countDownLatch.await(10, TimeUnit.SECONDS)) {
+        assertTrue(rps.get() > 10);
+      } else {
+        throw new AssertionError("total requests per seconds are still 0");
+      }
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+      server.stop();
+    }
+  }
+
 
   public static void main(String[] args) throws Exception {
     new TestRPC().testCallsInternal(conf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
index bc4cf3a6ee7..1564e41031a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
     .getLogFile;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Lists;
@@ -106,6 +108,8 @@ public class TestJournalNodeSync {
     File firstJournalDir = jCluster.getJournalDir(0, jid);
     File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
         .getCurrentDir();
+    assertThat(jCluster.getJournalNode(0).getRpcServer().getRpcServer().getRpcMetrics()
+        .getTotalRequests()).isGreaterThan(20);
 
     // Generate some edit logs and delete one.
     long firstTxId = generateEditLog();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
index decf85c06a4..ff6c2288b53 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -123,6 +124,7 @@ public class TestConsistentReadsObserver {
         + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
 
     NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
+    assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(0);
 
     dfs.create(testPath, (short)1).close();
     assertSentTo(0);
@@ -132,6 +134,7 @@ public class TestConsistentReadsObserver {
     // be triggered and client should retry active NN.
     dfs.getFileStatus(testPath);
     assertSentTo(0);
+    assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(1);
     // reset the original call queue
     NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org