You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2020/04/01 23:37:39 UTC

[hadoop] branch trunk updated: HDFS-15242. Add metrics for operations hold lock times of FsDatasetImpl. Contributed by Xiaoqiao He.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d3b5951  HDFS-15242. Add metrics for operations hold lock times of FsDatasetImpl. Contributed by Xiaoqiao He.
d3b5951 is described below

commit d3b595157256e198c4340d555e14ad6144f2eaa1
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Wed Apr 1 16:34:25 2020 -0700

    HDFS-15242. Add metrics for operations hold lock times of FsDatasetImpl. Contributed by Xiaoqiao He.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    Reviewed-by: Inigo Goiri <in...@apache.org>
---
 .../hadoop-common/src/site/markdown/Metrics.md     | 16 ++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java     | 95 +++++++++++++++++-----
 .../server/datanode/metrics/DataNodeMetrics.java   | 74 +++++++++++++++++
 .../hdfs/server/datanode/TestDataNodeMetrics.java  | 45 ++++++++++
 4 files changed, 210 insertions(+), 20 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 8210eee..43a3f33 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -453,6 +453,22 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
 | `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker |
 | `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker |
 | `EcReconstructionRemoteBytesRead` | Total number of bytes remote read by erasure coding worker |
+| `CreateRbwOpNumOps` | Total number of create rbw operations |
+| `CreateRbwOpAvgTime` | Average time of create rbw operations in milliseconds |
+| `RecoverRbwOpNumOps` | Total number of recovery rbw operations |
+| `RecoverRbwOpAvgTime` | Average time of recovery rbw operations in milliseconds |
+| `ConvertTemporaryToRbwOpNumOps` | Total number of convert temporary to rbw operations |
+| `ConvertTemporaryToRbwOpAvgTime` | Average time of convert temporary to rbw operations in milliseconds |
+| `CreateTemporaryOpNumOps` | Total number of create temporary operations |
+| `CreateTemporaryOpAvgTime` | Average time of create temporary operations in milliseconds |
+| `FinalizeBlockOpNumOps` | Total number of finalize block operations |
+| `FinalizeBlockOpAvgTime` | Average time of finalize block operations in milliseconds |
+| `UnfinalizeBlockOpNumOps` | Total number of un-finalize block operations |
+| `UnfinalizeBlockOpAvgTime` | Average time of un-finalize block operations in milliseconds |
+| `CheckAndUpdateOpNumOps` | Total number of check and update operations |
+| `CheckAndUpdateOpAvgTime` | Average time of check and update operations in milliseconds |
+| `UpdateReplicaUnderRecoveryOpNumOps` | Total number of update replica under recovery operations |
+| `UpdateReplicaUnderRecoveryOpAvgTime` | Average time of update replica under recovery operations in milliseconds |
 
 FsVolume
 --------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 39152fc..d6b50e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -243,6 +244,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
     
   final DataNode datanode;
+  private final DataNodeMetrics dataNodeMetrics;
   final DataStorage dataStorage;
   private final FsVolumeList volumes;
   final Map<String, DatanodeStorage> storageMap;
@@ -284,6 +286,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       ) throws IOException {
     this.fsRunning = true;
     this.datanode = datanode;
+    this.dataNodeMetrics = datanode.getMetrics();
     this.dataStorage = storage;
     this.conf = conf;
     this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
@@ -1425,6 +1428,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public ReplicaHandler createRbw(
       StorageType storageType, String storageId, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
+    long startTimeMs = Time.monotonicNow();
     try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getBlockId());
@@ -1485,6 +1489,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
       return new ReplicaHandler(newReplicaInfo, ref);
+    } finally {
+      if (dataNodeMetrics != null) {
+        long createRbwMs = Time.monotonicNow() - startTimeMs;
+        dataNodeMetrics.addCreateRbwOp(createRbwMs);
+      }
     }
   }
 
@@ -1493,27 +1502,34 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
     LOG.info("Recover RBW replica " + b);
-
-    while (true) {
-      try {
-        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
-          ReplicaInfo replicaInfo =
-              getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
-          // check the replica's state
-          if (replicaInfo.getState() != ReplicaState.RBW) {
-            throw new ReplicaNotFoundException(
-                ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
-          }
-          ReplicaInPipeline rbw = (ReplicaInPipeline)replicaInfo;
-          if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
-            throw new MustStopExistingWriter(rbw);
+    long startTimeMs = Time.monotonicNow();
+    try {
+      while (true) {
+        try {
+          try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+            ReplicaInfo replicaInfo =
+                getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
+            // check the replica's state
+            if (replicaInfo.getState() != ReplicaState.RBW) {
+              throw new ReplicaNotFoundException(
+                  ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
+            }
+            ReplicaInPipeline rbw = (ReplicaInPipeline) replicaInfo;
+            if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
+              throw new MustStopExistingWriter(rbw);
+            }
+            LOG.info("At " + datanode.getDisplayName() + ", Recovering " + rbw);
+            return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
           }
-          LOG.info("At " + datanode.getDisplayName() + ", Recovering " + rbw);
-          return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
+        } catch (MustStopExistingWriter e) {
+          e.getReplicaInPipeline().stopWriter(
+              datanode.getDnConf().getXceiverStopTimeout());
         }
-      } catch (MustStopExistingWriter e) {
-        e.getReplicaInPipeline().stopWriter(
-            datanode.getDnConf().getXceiverStopTimeout());
+      }
+    } finally {
+      if (dataNodeMetrics != null) {
+        long recoverRbwMs = Time.monotonicNow() - startTimeMs;
+        dataNodeMetrics.addRecoverRbwOp(recoverRbwMs);
       }
     }
   }
@@ -1581,7 +1597,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
-
+    long startTimeMs = Time.monotonicNow();
     try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final long blockId = b.getBlockId();
       final long expectedGs = b.getGenerationStamp();
@@ -1637,6 +1653,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       // overwrite the RBW in the volume map
       volumeMap.add(b.getBlockPoolId(), rbw.getReplicaInfo());
       return rbw;
+    } finally {
+      if (dataNodeMetrics != null) {
+        long convertTemporaryToRbwMs = Time.monotonicNow() - startTimeMs;
+        dataNodeMetrics.addConvertTemporaryToRbwOp(convertTemporaryToRbwMs);
+      }
     }
   }
 
@@ -1701,6 +1722,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       // Stop the previous writer
       ((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs);
     } while (true);
+    long holdLockTimeMs = Time.monotonicNow() - startTimeMs;
     if (lastFoundReplicaInfo != null
         && !isReplicaProvided(lastFoundReplicaInfo)) {
       // Old blockfile should be deleted synchronously as it might collide
@@ -1709,6 +1731,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
           false);
     }
+    long startHoldLockTimeMs = Time.monotonicNow();
     try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
           .getNumBytes());
@@ -1723,6 +1746,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
       return new ReplicaHandler(newReplicaInfo, ref);
+    } finally {
+      if (dataNodeMetrics != null) {
+        // Create temporary operation hold write lock twice.
+        long createTemporaryOpMs = Time.monotonicNow() - startHoldLockTimeMs
+            + holdLockTimeMs;
+        dataNodeMetrics.addCreateTemporaryOp(createTemporaryOpMs);
+      }
     }
   }
 
@@ -1760,6 +1790,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     ReplicaInfo replicaInfo = null;
     ReplicaInfo finalizedReplicaInfo = null;
+    long startTimeMs = Time.monotonicNow();
     try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       if (Thread.interrupted()) {
         // Don't allow data modifications from interrupted threads
@@ -1772,6 +1803,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         return;
       }
       finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo);
+    } finally {
+      if (dataNodeMetrics != null) {
+        long finalizeBlockMs = Time.monotonicNow() - startTimeMs;
+        dataNodeMetrics.addFinalizeBlockOp(finalizeBlockMs);
+      }
     }
     /*
      * Sync the directory after rename from tmp/rbw to Finalized if
@@ -1836,6 +1872,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override // FsDatasetSpi
   public void unfinalizeBlock(ExtendedBlock b) throws IOException {
+    long startTimeMs = Time.monotonicNow();
     try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getLocalBlock());
@@ -1853,6 +1890,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
               b.getBlockId(), true);
         }
       }
+    } finally {
+      if (dataNodeMetrics != null) {
+        long unFinalizedBlockMs = Time.monotonicNow() - startTimeMs;
+        dataNodeMetrics.addUnfinalizeBlockOp(unFinalizedBlockMs);
+      }
     }
   }
 
@@ -2406,6 +2448,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
+    long startTimeMs = Time.monotonicNow();
     try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       memBlockInfo = volumeMap.get(bpid, blockId);
       if (memBlockInfo != null &&
@@ -2581,6 +2624,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             + memBlockInfo.getBlockDataLength());
         memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength());
       }
+    } finally {
+      if (dataNodeMetrics != null) {
+        long checkAndUpdateTimeMs = Time.monotonicNow() - startTimeMs;
+        dataNodeMetrics.addCheckAndUpdateOp(checkAndUpdateTimeMs);
+      }
     }
 
     // Send corrupt block report outside the lock
@@ -2714,6 +2762,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                                     final long recoveryId,
                                     final long newBlockId,
                                     final long newlength) throws IOException {
+    long startTimeMs = Time.monotonicNow();
     try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       //get replica
       final String bpid = oldBlock.getBlockPoolId();
@@ -2770,6 +2819,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       checkReplicaFiles(finalized);
 
       return finalized;
+    } finally {
+      if (dataNodeMetrics != null) {
+        long updateReplicaUnderRecoveryMs = Time.monotonicNow() - startTimeMs;
+        dataNodeMetrics.addUpdateReplicaUnderRecoveryOp(
+            updateReplicaUnderRecoveryMs);
+      }
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index cc80237..00093f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -167,6 +167,16 @@ public class DataNodeMetrics {
   @Metric("Rate of processed commands of all BPServiceActors")
   private MutableRate processedCommandsOp;
 
+  // FsDatasetImpl local file process metrics.
+  @Metric private MutableRate createRbwOp;
+  @Metric private MutableRate recoverRbwOp;
+  @Metric private MutableRate convertTemporaryToRbwOp;
+  @Metric private MutableRate createTemporaryOp;
+  @Metric private MutableRate finalizeBlockOp;
+  @Metric private MutableRate unfinalizeBlockOp;
+  @Metric private MutableRate checkAndUpdateOp;
+  @Metric private MutableRate updateReplicaUnderRecoveryOp;
+
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   @Metric("Milliseconds spent on calling NN rpc")
   private MutableRatesWithAggregation
@@ -574,4 +584,68 @@ public class DataNodeMetrics {
   public void addNumProcessedCommands(long latency) {
     processedCommandsOp.add(latency);
   }
+
+  /**
+   * Add addCreateRbwOp metrics.
+   * @param latency milliseconds of create RBW file
+   */
+  public void addCreateRbwOp(long latency) {
+    createRbwOp.add(latency);
+  }
+
+  /**
+   * Add addRecoverRbwOp metrics.
+   * @param latency milliseconds of recovery RBW file
+   */
+  public void addRecoverRbwOp(long latency) {
+    recoverRbwOp.add(latency);
+  }
+
+  /**
+   * Add addConvertTemporaryToRbwOp metrics.
+   * @param latency milliseconds of convert temporary to RBW file
+   */
+  public void addConvertTemporaryToRbwOp(long latency) {
+    convertTemporaryToRbwOp.add(latency);
+  }
+
+  /**
+   * Add addCreateTemporaryOp metrics.
+   * @param latency milliseconds of create temporary block file
+   */
+  public void addCreateTemporaryOp(long latency) {
+    createTemporaryOp.add(latency);
+  }
+
+  /**
+   * Add addFinalizeBlockOp metrics.
+   * @param latency milliseconds of finalize block
+   */
+  public void addFinalizeBlockOp(long latency) {
+    finalizeBlockOp.add(latency);
+  }
+
+  /**
+   * Add addUnfinalizeBlockOp metrics.
+   * @param latency milliseconds of un-finalize block file
+   */
+  public void addUnfinalizeBlockOp(long latency) {
+    unfinalizeBlockOp.add(latency);
+  }
+
+  /**
+   * Add addCheckAndUpdateOp metrics.
+   * @param latency milliseconds of check and update block file
+   */
+  public void addCheckAndUpdateOp(long latency) {
+    checkAndUpdateOp.add(latency);
+  }
+
+  /**
+   * Add addUpdateReplicaUnderRecoveryOp metrics.
+   * @param latency milliseconds of update and replica under recovery block file
+   */
+  public void addUpdateReplicaUnderRecoveryOp(long latency) {
+    updateReplicaUnderRecoveryOp.add(latency);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index b4e2640..81d144f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -37,6 +37,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 
 import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,6 +161,50 @@ public class TestDataNodeMetrics {
   }
 
   /**
+   * HDFS-15242: This function ensures that writing causes some metrics
+   * of FSDatasetImpl to increment.
+   */
+  @Test
+  public void testFsDatasetMetrics() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      cluster.waitActive();
+      String bpid = cluster.getNameNode().getNamesystem().getBlockPoolId();
+      List<DataNode> datanodes = cluster.getDataNodes();
+      DataNode datanode = datanodes.get(0);
+
+      // Verify both of metrics set to 0 when initialize.
+      MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
+      assertCounter("CreateRbwOpNumOps", 0L, rb);
+      assertCounter("CreateTemporaryOpNumOps", 0L, rb);
+      assertCounter("FinalizeBlockOpNumOps", 0L, rb);
+
+      // Write into a file to trigger DN metrics.
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path testFile = new Path("/testBlockMetrics.txt");
+      FSDataOutputStream fout = fs.create(testFile);
+      fout.write(new byte[1]);
+      fout.hsync();
+      fout.close();
+
+      // Create temporary block file to trigger DN metrics.
+      final ExtendedBlock block = new ExtendedBlock(bpid, 1, 1, 2001);
+      datanode.data.createTemporary(StorageType.DEFAULT, null, block, false);
+
+      // Verify both of metrics value has updated after do some operations.
+      rb = getMetrics(datanode.getMetrics().name());
+      assertCounter("CreateRbwOpNumOps", 1L, rb);
+      assertCounter("CreateTemporaryOpNumOps", 1L, rb);
+      assertCounter("FinalizeBlockOpNumOps", 1L, rb);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
    * Tests that round-trip acks in a datanode write pipeline are correctly 
    * measured. 
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org