You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2022/06/18 04:17:27 UTC
[hadoop] branch trunk updated: HADOOP-18288. Total requests and total requests per sec served by RPC servers (#4431)
This is an automated email from the ASF dual-hosted git repository.
tomscut pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new e38e13be03d HADOOP-18288. Total requests and total requests per sec served by RPC servers (#4431)
e38e13be03d is described below
commit e38e13be03d581574f66c50b56a0918844849e17
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri Jun 17 21:17:20 2022 -0700
HADOOP-18288. Total requests and total requests per sec served by RPC servers (#4431)
Reviewed-by: Steve Loughran <st...@apache.org>
Signed-off-by: Tao Li <to...@apache.org>
---
.../hadoop/fs/CommonConfigurationKeysPublic.java | 8 +++
.../main/java/org/apache/hadoop/ipc/Server.java | 70 ++++++++++++++++++++++
.../org/apache/hadoop/ipc/metrics/RpcMetrics.java | 10 ++++
.../hadoop-common/src/site/markdown/Metrics.md | 2 +
.../test/java/org/apache/hadoop/ipc/TestRPC.java | 58 ++++++++++++++++++
.../hdfs/qjournal/server/TestJournalNodeSync.java | 4 ++
.../namenode/ha/TestConsistentReadsObserver.java | 3 +
7 files changed, 155 insertions(+)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index 52252365092..7a458e8f3fc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -1054,5 +1054,13 @@ public class CommonConfigurationKeysPublic {
public static final String HADOOP_HTTP_IDLE_TIMEOUT_MS_KEY =
"hadoop.http.idle_timeout.ms";
public static final int HADOOP_HTTP_IDLE_TIMEOUT_MS_DEFAULT = 60000;
+
+ /**
+ * To configure scheduling of server metrics update thread. This config is used to indicate
+ * initial delay and delay between each execution of the metric update runnable thread.
+ */
+ public static final String IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL =
+ "ipc.server.metrics.update.runner.interval";
+ public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 90f730d3883..e79612f7a5a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -65,9 +65,12 @@ import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import javax.security.sasl.Sasl;
@@ -127,6 +130,8 @@ import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.tracing.TraceUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.classification.VisibleForTesting;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream;
import org.apache.hadoop.thirdparty.protobuf.Message;
@@ -500,6 +505,11 @@ public abstract class Server {
private Responder responder = null;
private Handler[] handlers = null;
private final AtomicInteger numInProcessHandler = new AtomicInteger();
+ private final LongAdder totalRequests = new LongAdder();
+ private long lastSeenTotalRequests = 0;
+ private long totalRequestsPerSecond = 0;
+ private final long metricsUpdaterInterval;
+ private final ScheduledExecutorService scheduledExecutorService;
private boolean logSlowRPC = false;
@@ -515,6 +525,14 @@ public abstract class Server {
return numInProcessHandler.get();
}
+ public long getTotalRequests() {
+ return totalRequests.sum();
+ }
+
+ public long getTotalRequestsPerSecond() {
+ return totalRequestsPerSecond;
+ }
+
/**
* Sets slow RPC flag.
* @param logSlowRPCFlag input logSlowRPCFlag.
@@ -578,6 +596,7 @@ public abstract class Server {
}
void updateMetrics(Call call, long startTime, boolean connDropped) {
+ totalRequests.increment();
// delta = handler + processing + response
long deltaNanos = Time.monotonicNowNanos() - startTime;
long timestampNanos = call.timestampNanos;
@@ -3304,6 +3323,14 @@ public abstract class Server {
this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
this.exceptionsHandler.addTerseLoggingExceptions(
HealthCheckFailedException.class);
+ this.metricsUpdaterInterval =
+ conf.getLong(CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL,
+ CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT);
+ this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Hadoop-Metrics-Updater-%d")
+ .build());
+ this.scheduledExecutorService.scheduleWithFixedDelay(new MetricsUpdateRunner(),
+ metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS);
}
public synchronized void addAuxiliaryListener(int auxiliaryPort)
@@ -3598,10 +3625,25 @@ public abstract class Server {
}
responder.interrupt();
notifyAll();
+ shutdownMetricsUpdaterExecutor();
this.rpcMetrics.shutdown();
this.rpcDetailedMetrics.shutdown();
}
+ private void shutdownMetricsUpdaterExecutor() {
+ this.scheduledExecutorService.shutdown();
+ try {
+ boolean isExecutorShutdown =
+ this.scheduledExecutorService.awaitTermination(3, TimeUnit.SECONDS);
+ if (!isExecutorShutdown) {
+ LOG.info("Hadoop Metrics Updater executor could not be shutdown.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info("Hadoop Metrics Updater executor shutdown interrupted.", e);
+ }
+ }
+
/**
* Wait for the server to be stopped.
* Does not wait for all subthreads to finish.
@@ -4061,4 +4103,32 @@ public abstract class Server {
public String getServerName() {
return serverName;
}
+
+ /**
+ * Server metrics updater thread, used to update some metrics on a regular basis.
+ * For instance, requests per second.
+ */
+ private class MetricsUpdateRunner implements Runnable {
+
+ private long lastExecuted = 0;
+
+ @Override
+ public synchronized void run() {
+ long currentTime = Time.monotonicNow();
+ if (lastExecuted == 0) {
+ lastExecuted = currentTime - metricsUpdaterInterval;
+ }
+ long currentTotalRequests = totalRequests.sum();
+ long totalRequestsDiff = currentTotalRequests - lastSeenTotalRequests;
+ lastSeenTotalRequests = currentTotalRequests;
+ if ((currentTime - lastExecuted) > 0) {
+ double totalRequestsPerSecInDouble =
+ (double) totalRequestsDiff / TimeUnit.MILLISECONDS.toSeconds(
+ currentTime - lastExecuted);
+ totalRequestsPerSecond = ((long) totalRequestsPerSecInDouble);
+ }
+ lastExecuted = currentTime;
+ }
+ }
+
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
index a67530b3c97..bf21e3865fa 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
@@ -151,6 +151,16 @@ public class RpcMetrics {
return server.getNumDroppedConnections();
}
+ @Metric("Number of total requests")
+ public long getTotalRequests() {
+ return server.getTotalRequests();
+ }
+
+ @Metric("Number of total requests per second")
+ public long getTotalRequestsPerSecond() {
+ return server.getTotalRequestsPerSecond();
+ }
+
public TimeUnit getMetricsTimeUnit() {
return metricsTimeUnit;
}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 47786e473a5..04cbd9fedf8 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -104,6 +104,8 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
| `rpcLockWaitTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
| `rpcLockWaitTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
| `rpcLockWaitTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
+| `TotalRequests` | Total num of requests served by the RPC server. |
+| `TotalRequestsPerSeconds` | Total num of requests per second served by the RPC server. |
RetryCache/NameNodeRetryCache
-----------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 5fc9cb5410b..5caabd22a88 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.ipc;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
@@ -84,6 +86,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
@@ -1697,6 +1700,61 @@ public class TestRPC extends TestRpcBase {
}
}
+ @Test
+ public void testNumTotalRequestsMetrics() throws Exception {
+ UserGroupInformation ugi = UserGroupInformation.
+ createUserForTesting("userXyz", new String[0]);
+
+ final Server server = setupTestServer(conf, 1);
+
+ ExecutorService executorService = null;
+ try {
+ RpcMetrics rpcMetrics = server.getRpcMetrics();
+ assertEquals(0, rpcMetrics.getTotalRequests());
+ assertEquals(0, rpcMetrics.getTotalRequestsPerSecond());
+
+ List<ExternalCall<Void>> externalCallList = new ArrayList<>();
+
+ executorService = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("testNumTotalRequestsMetrics")
+ .build());
+ AtomicInteger rps = new AtomicInteger(0);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ executorService.submit(() -> {
+ while (true) {
+ int numRps = (int) rpcMetrics.getTotalRequestsPerSecond();
+ rps.getAndSet(numRps);
+ if (rps.get() > 0) {
+ countDownLatch.countDown();
+ break;
+ }
+ }
+ });
+
+ for (int i = 0; i < 100000; i++) {
+ externalCallList.add(newExtCall(ugi, () -> null));
+ }
+ for (ExternalCall<Void> externalCall : externalCallList) {
+ server.queueCall(externalCall);
+ }
+ for (ExternalCall<Void> externalCall : externalCallList) {
+ externalCall.get();
+ }
+
+ assertEquals(100000, rpcMetrics.getTotalRequests());
+ if (countDownLatch.await(10, TimeUnit.SECONDS)) {
+ assertTrue(rps.get() > 10);
+ } else {
+ throw new AssertionError("total requests per seconds are still 0");
+ }
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ server.stop();
+ }
+ }
+
public static void main(String[] args) throws Exception {
new TestRPC().testCallsInternal(conf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
index bc4cf3a6ee7..1564e41031a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
.getLogFile;
+import static org.assertj.core.api.Assertions.assertThat;
+
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Lists;
@@ -106,6 +108,8 @@ public class TestJournalNodeSync {
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
+ assertThat(jCluster.getJournalNode(0).getRpcServer().getRpcServer().getRpcMetrics()
+ .getTotalRequests()).isGreaterThan(20);
// Generate some edit logs and delete one.
long firstTxId = generateEditLog();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
index decf85c06a4..ff6c2288b53 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -123,6 +124,7 @@ public class TestConsistentReadsObserver {
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
+ assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(0);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
@@ -132,6 +134,7 @@ public class TestConsistentReadsObserver {
// be triggered and client should retry active NN.
dfs.getFileStatus(testPath);
assertSentTo(0);
+ assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(1);
// reset the original call queue
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org