You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2021/01/08 07:46:42 UTC
[hadoop] branch trunk updated: HDFS-15754. Add DataNode packet
metrics (#2578)
This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 87bd4d2 HDFS-15754. Add DataNode packet metrics (#2578)
87bd4d2 is described below
commit 87bd4d2aca5bdb81a4c6e4980763adf26ba106e8
Author: lfengnan <lf...@uber.com>
AuthorDate: Thu Jan 7 23:46:23 2021 -0800
HDFS-15754. Add DataNode packet metrics (#2578)
Contributed by Fengnan Li.
---
.../hadoop-common/src/site/markdown/Metrics.md | 4 ++
.../hadoop/hdfs/server/datanode/BlockReceiver.java | 48 +++++++++++++--------
.../server/datanode/DataNodeFaultInjector.java | 10 +++++
.../server/datanode/metrics/DataNodeMetrics.java | 21 ++++++++++
.../hdfs/server/datanode/TestDataNodeMetrics.java | 49 ++++++++++++++++++++++
5 files changed, 114 insertions(+), 18 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 18d3263..b93a11c 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -469,6 +469,10 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `CheckAndUpdateOpAvgTime` | Average time of check and update operations in milliseconds |
| `UpdateReplicaUnderRecoveryOpNumOps` | Total number of update replica under recovery operations |
| `UpdateReplicaUnderRecoveryOpAvgTime` | Average time of update replica under recovery operations in milliseconds |
+| `PacketsReceived` | Total number of packets received by Datanode (excluding heartbeat packet from client) |
+| `PacketsSlowWriteToMirror` | Total number of packets whose write to other Datanodes in the pipeline takes more than a certain time (300ms by default) |
+| `PacketsSlowWriteToDisk` | Total number of packets whose write to disk takes more than a certain time (300ms by default) |
+| `PacketsSlowWriteToOsCache` | Total number of packets whose write to os cache takes more than a certain time (300ms by default) |
FsVolume
--------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 825905f..cbff582 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -586,6 +586,7 @@ class BlockReceiver implements Closeable {
return 0;
}
+ datanode.metrics.incrPacketsReceived();
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
@@ -601,12 +602,15 @@ class BlockReceiver implements Closeable {
mirrorAddr,
duration);
trackSendPacketToLastNodeInPipeline(duration);
- if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
- LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
- + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
- + "downstream DNs=" + Arrays.toString(downstreamDNs)
- + ", blockId=" + replicaInfo.getBlockId()
- + ", seqno=" + seqno);
+ if (duration > datanodeSlowLogThresholdMs) {
+ datanode.metrics.incrPacketsSlowWriteToMirror();
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Slow BlockReceiver write packet to mirror took {}ms " +
+ "(threshold={}ms), downstream DNs={}, blockId={}, seqno={}",
+ duration, datanodeSlowLogThresholdMs,
+ Arrays.toString(downstreamDNs), replicaInfo.getBlockId(),
+ seqno);
+ }
}
} catch (IOException e) {
handleMirrorOutError(e);
@@ -736,13 +740,17 @@ class BlockReceiver implements Closeable {
long begin = Time.monotonicNow();
streams.writeDataToDisk(dataBuf.array(),
startByteToDisk, numBytesToDisk);
+ // no-op in prod
+ DataNodeFaultInjector.get().delayWriteToDisk();
long duration = Time.monotonicNow() - begin;
- if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
- LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
- + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
- + "volume=" + getVolumeBaseUri()
- + ", blockId=" + replicaInfo.getBlockId()
- + ", seqno=" + seqno);
+ if (duration > datanodeSlowLogThresholdMs) {
+ datanode.metrics.incrPacketsSlowWriteToDisk();
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Slow BlockReceiver write data to disk cost: {}ms " +
+ "(threshold={}ms), volume={}, blockId={}, seqno={}",
+ duration, datanodeSlowLogThresholdMs, getVolumeBaseUri(),
+ replicaInfo.getBlockId(), seqno);
+ }
}
if (duration > maxWriteToDiskMs) {
@@ -930,13 +938,17 @@ class BlockReceiver implements Closeable {
POSIX_FADV_DONTNEED);
}
lastCacheManagementOffset = offsetInBlock;
+ // For testing. Normally no-op.
+ DataNodeFaultInjector.get().delayWriteToOsCache();
long duration = Time.monotonicNow() - begin;
- if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
- LOG.warn("Slow manageWriterOsCache took " + duration
- + "ms (threshold=" + datanodeSlowLogThresholdMs
- + "ms), volume=" + getVolumeBaseUri()
- + ", blockId=" + replicaInfo.getBlockId()
- + ", seqno=" + seqno);
+ if (duration > datanodeSlowLogThresholdMs) {
+ datanode.metrics.incrPacketsSlowWriteToOsCache();
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Slow manageWriterOsCache took {}ms " +
+ "(threshold={}ms), volume={}, blockId={}, seqno={}",
+ duration, datanodeSlowLogThresholdMs, getVolumeBaseUri(),
+ replicaInfo.getBlockId(), seqno);
+ }
}
}
} catch (Throwable t) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index b55d793..b89a802 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -68,6 +68,16 @@ public class DataNodeFaultInjector {
}
/**
+ * Used as a hook to delay writing a packet to disk.
+ */
+ public void delayWriteToDisk() {}
+
+ /**
+ * Used as a hook to delay writing a packet to os cache.
+ */
+ public void delayWriteToOsCache() {}
+
+ /**
* Used as a hook to intercept the latency of sending ack.
*/
public void logDelaySendingAckToUpstream(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 16d1561..9350d95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -183,6 +183,11 @@ public class DataNodeMetrics {
@Metric private MutableRate checkAndUpdateOp;
@Metric private MutableRate updateReplicaUnderRecoveryOp;
+ @Metric MutableCounterLong packetsReceived;
+ @Metric MutableCounterLong packetsSlowWriteToMirror;
+ @Metric MutableCounterLong packetsSlowWriteToDisk;
+ @Metric MutableCounterLong packetsSlowWriteToOsCache;
+
final MetricsRegistry registry = new MetricsRegistry("datanode");
@Metric("Milliseconds spent on calling NN rpc")
private MutableRatesWithAggregation
@@ -690,4 +695,20 @@ public class DataNodeMetrics {
public void addUpdateReplicaUnderRecoveryOp(long latency) {
updateReplicaUnderRecoveryOp.add(latency);
}
+
+ public void incrPacketsReceived() {
+ packetsReceived.incr();
+ }
+
+ public void incrPacketsSlowWriteToMirror() {
+ packetsSlowWriteToMirror.incr();
+ }
+
+ public void incrPacketsSlowWriteToDisk() {
+ packetsSlowWriteToDisk.incr();
+ }
+
+ public void incrPacketsSlowWriteToOsCache() {
+ packetsSlowWriteToOsCache.incr();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index 51638c7..39ea21f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -161,6 +163,53 @@ public class TestDataNodeMetrics {
}
}
+ @Test
+ public void testReceivePacketSlowMetrics() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ final int interval = 1;
+ conf.setInt(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, interval);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(3).build();
+ try {
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ final DataNodeFaultInjector injector =
+ Mockito.mock(DataNodeFaultInjector.class);
+ Answer answer = new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock)
+ throws Throwable {
+ // make the op taking longer time
+ Thread.sleep(1000);
+ return null;
+ }
+ };
+ Mockito.doAnswer(answer).when(injector).
+ stopSendingPacketDownstream(Mockito.anyString());
+ Mockito.doAnswer(answer).when(injector).delayWriteToOsCache();
+ Mockito.doAnswer(answer).when(injector).delayWriteToDisk();
+ DataNodeFaultInjector.set(injector);
+ Path testFile = new Path("/testFlushNanosMetric.txt");
+ FSDataOutputStream fout = fs.create(testFile);
+ fout.write(new byte[1]);
+ fout.hsync();
+ fout.close();
+ List<DataNode> datanodes = cluster.getDataNodes();
+ DataNode datanode = datanodes.get(0);
+ MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
+ assertTrue("More than 1 packet received",
+ getLongCounter("TotalPacketsReceived", dnMetrics) > 1L);
+ assertTrue("More than 1 slow packet to mirror",
+ getLongCounter("TotalPacketsSlowWriteToMirror", dnMetrics) > 1L);
+ assertCounter("TotalPacketsSlowWriteToDisk", 1L, dnMetrics);
+ assertCounter("TotalPacketsSlowWriteToOsCache", 0L, dnMetrics);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
/**
* HDFS-15242: This function ensures that writing causes some metrics
* of FSDatasetImpl to increment.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org