You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2021/01/08 07:46:42 UTC

[hadoop] branch trunk updated: HDFS-15754. Add DataNode packet metrics (#2578)

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 87bd4d2  HDFS-15754. Add DataNode packet metrics (#2578)
87bd4d2 is described below

commit 87bd4d2aca5bdb81a4c6e4980763adf26ba106e8
Author: lfengnan <lf...@uber.com>
AuthorDate: Thu Jan 7 23:46:23 2021 -0800

    HDFS-15754. Add DataNode packet metrics (#2578)
    
    Contributed by Fengnan Li.
---
 .../hadoop-common/src/site/markdown/Metrics.md     |  4 ++
 .../hadoop/hdfs/server/datanode/BlockReceiver.java | 48 +++++++++++++--------
 .../server/datanode/DataNodeFaultInjector.java     | 10 +++++
 .../server/datanode/metrics/DataNodeMetrics.java   | 21 ++++++++++
 .../hdfs/server/datanode/TestDataNodeMetrics.java  | 49 ++++++++++++++++++++++
 5 files changed, 114 insertions(+), 18 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 18d3263..b93a11c 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -469,6 +469,10 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
 | `CheckAndUpdateOpAvgTime` | Average time of check and update operations in milliseconds |
 | `UpdateReplicaUnderRecoveryOpNumOps` | Total number of update replica under recovery operations |
 | `UpdateReplicaUnderRecoveryOpAvgTime` | Average time of update replica under recovery operations in milliseconds |
+| `PacketsReceived` | Total number of packets received by Datanode (excluding heartbeat packet from client) |
+| `PacketsSlowWriteToMirror` | Total number of packets whose write to other Datanodes in the pipeline takes more than a certain time (300ms by default) |
+| `PacketsSlowWriteToDisk` | Total number of packets whose write to disk takes more than a certain time (300ms by default) |
+| `PacketsSlowWriteToOsCache` | Total number of packets whose write to os cache takes more than a certain time (300ms by default) |
 
 FsVolume
 --------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 825905f..cbff582 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -586,6 +586,7 @@ class BlockReceiver implements Closeable {
       return 0;
     }
 
+    datanode.metrics.incrPacketsReceived();
     //First write the packet to the mirror:
     if (mirrorOut != null && !mirrorError) {
       try {
@@ -601,12 +602,15 @@ class BlockReceiver implements Closeable {
             mirrorAddr,
             duration);
         trackSendPacketToLastNodeInPipeline(duration);
-        if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
-          LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
-              + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
-              + "downstream DNs=" + Arrays.toString(downstreamDNs)
-              + ", blockId=" + replicaInfo.getBlockId()
-              + ", seqno=" + seqno);
+        if (duration > datanodeSlowLogThresholdMs) {
+          datanode.metrics.incrPacketsSlowWriteToMirror();
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Slow BlockReceiver write packet to mirror took {}ms " +
+                "(threshold={}ms), downstream DNs={}, blockId={}, seqno={}",
+                duration, datanodeSlowLogThresholdMs,
+                Arrays.toString(downstreamDNs), replicaInfo.getBlockId(),
+                seqno);
+          }
         }
       } catch (IOException e) {
         handleMirrorOutError(e);
@@ -736,13 +740,17 @@ class BlockReceiver implements Closeable {
           long begin = Time.monotonicNow();
           streams.writeDataToDisk(dataBuf.array(),
               startByteToDisk, numBytesToDisk);
+          // no-op in prod
+          DataNodeFaultInjector.get().delayWriteToDisk();
           long duration = Time.monotonicNow() - begin;
-          if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
-            LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
-                + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
-                + "volume=" + getVolumeBaseUri()
-                + ", blockId=" + replicaInfo.getBlockId()
-                + ", seqno=" + seqno);
+          if (duration > datanodeSlowLogThresholdMs) {
+            datanode.metrics.incrPacketsSlowWriteToDisk();
+            if (LOG.isWarnEnabled()) {
+              LOG.warn("Slow BlockReceiver write data to disk cost: {}ms " +
+                      "(threshold={}ms), volume={}, blockId={}, seqno={}",
+                  duration, datanodeSlowLogThresholdMs, getVolumeBaseUri(),
+                  replicaInfo.getBlockId(), seqno);
+            }
           }
 
           if (duration > maxWriteToDiskMs) {
@@ -930,13 +938,17 @@ class BlockReceiver implements Closeable {
               POSIX_FADV_DONTNEED);
         }
         lastCacheManagementOffset = offsetInBlock;
+        // For testing. Normally no-op.
+        DataNodeFaultInjector.get().delayWriteToOsCache();
         long duration = Time.monotonicNow() - begin;
-        if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
-          LOG.warn("Slow manageWriterOsCache took " + duration
-              + "ms (threshold=" + datanodeSlowLogThresholdMs
-              + "ms), volume=" + getVolumeBaseUri()
-              + ", blockId=" + replicaInfo.getBlockId()
-              + ", seqno=" + seqno);
+        if (duration > datanodeSlowLogThresholdMs) {
+          datanode.metrics.incrPacketsSlowWriteToOsCache();
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Slow manageWriterOsCache took {}ms " +
+                    "(threshold={}ms), volume={}, blockId={}, seqno={}",
+                duration, datanodeSlowLogThresholdMs, getVolumeBaseUri(),
+                replicaInfo.getBlockId(), seqno);
+          }
         }
       }
     } catch (Throwable t) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index b55d793..b89a802 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -68,6 +68,16 @@ public class DataNodeFaultInjector {
   }
 
   /**
+   * Used as a hook to delay writing a packet to disk.
+   */
+  public void delayWriteToDisk() {}
+
+  /**
+   * Used as a hook to delay writing a packet to os cache.
+   */
+  public void delayWriteToOsCache() {}
+
+  /**
    * Used as a hook to intercept the latency of sending ack.
    */
   public void logDelaySendingAckToUpstream(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 16d1561..9350d95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -183,6 +183,11 @@ public class DataNodeMetrics {
   @Metric private MutableRate checkAndUpdateOp;
   @Metric private MutableRate updateReplicaUnderRecoveryOp;
 
+  @Metric MutableCounterLong packetsReceived;
+  @Metric MutableCounterLong packetsSlowWriteToMirror;
+  @Metric MutableCounterLong packetsSlowWriteToDisk;
+  @Metric MutableCounterLong packetsSlowWriteToOsCache;
+
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   @Metric("Milliseconds spent on calling NN rpc")
   private MutableRatesWithAggregation
@@ -690,4 +695,20 @@ public class DataNodeMetrics {
   public void addUpdateReplicaUnderRecoveryOp(long latency) {
     updateReplicaUnderRecoveryOp.add(latency);
   }
+
+  public void incrPacketsReceived() {
+    packetsReceived.incr();
+  }
+
+  public void incrPacketsSlowWriteToMirror() {
+    packetsSlowWriteToMirror.incr();
+  }
+
+  public void incrPacketsSlowWriteToDisk() {
+    packetsSlowWriteToDisk.incr();
+  }
+
+  public void incrPacketsSlowWriteToOsCache() {
+    packetsSlowWriteToOsCache.incr();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index 51638c7..39ea21f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.test.MetricsAsserts;
 import org.apache.hadoop.util.Time;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -161,6 +163,53 @@ public class TestDataNodeMetrics {
     }
   }
 
+  @Test
+  public void testReceivePacketSlowMetrics() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    final int interval = 1;
+    conf.setInt(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, interval);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3).build();
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      final DataNodeFaultInjector injector =
+          Mockito.mock(DataNodeFaultInjector.class);
+      Answer answer = new Answer() {
+        @Override
+        public Object answer(InvocationOnMock invocationOnMock)
+            throws Throwable {
+          // make the op taking longer time
+          Thread.sleep(1000);
+          return null;
+        }
+      };
+      Mockito.doAnswer(answer).when(injector).
+          stopSendingPacketDownstream(Mockito.anyString());
+      Mockito.doAnswer(answer).when(injector).delayWriteToOsCache();
+      Mockito.doAnswer(answer).when(injector).delayWriteToDisk();
+      DataNodeFaultInjector.set(injector);
+      Path testFile = new Path("/testFlushNanosMetric.txt");
+      FSDataOutputStream fout = fs.create(testFile);
+      fout.write(new byte[1]);
+      fout.hsync();
+      fout.close();
+      List<DataNode> datanodes = cluster.getDataNodes();
+      DataNode datanode = datanodes.get(0);
+      MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
+      assertTrue("More than 1 packet received",
+          getLongCounter("TotalPacketsReceived", dnMetrics) > 1L);
+      assertTrue("More than 1 slow packet to mirror",
+          getLongCounter("TotalPacketsSlowWriteToMirror", dnMetrics) > 1L);
+      assertCounter("TotalPacketsSlowWriteToDisk", 1L, dnMetrics);
+      assertCounter("TotalPacketsSlowWriteToOsCache", 0L, dnMetrics);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   /**
    * HDFS-15242: This function ensures that writing causes some metrics
    * of FSDatasetImpl to increment.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org