You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/09/20 00:47:50 UTC

[GitHub] [hadoop] ZanderXu commented on a diff in pull request #4903: HDFS-16774.Improve async delete replica on datanode

ZanderXu commented on code in PR #4903:
URL: https://github.com/apache/hadoop/pull/4903#discussion_r974787840


##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java:
##########
@@ -359,6 +371,89 @@ public void run() {
         IOUtils.cleanupWithLogger(null, this.volumeRef);
       }
     }
+
+    private boolean removeReplicaFromMem() {
+      try (AutoCloseableLock lock = fsdatasetImpl.acquireDatasetLockManager().writeLock(
+          DataNodeLockManager.LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
+        final ReplicaInfo info = fsdatasetImpl.volumeMap
+            .get(block.getBlockPoolId(), block.getLocalBlock());
+        if (info == null) {
+          ReplicaInfo infoByBlockId =
+              fsdatasetImpl.volumeMap.get(block.getBlockPoolId(),
+                  block.getLocalBlock().getBlockId());
+          if (infoByBlockId == null) {
+            // It is okay if the block is not found -- it
+            // may be deleted earlier.
+            LOG.info("Failed to delete replica " + block.getLocalBlock()
+                + ": ReplicaInfo not found in removeReplicaFromMem.");

Review Comment:
   Can use `LOG.info("...{}...", block.getLocalBlock())`



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java:
##########
@@ -359,6 +371,89 @@ public void run() {
         IOUtils.cleanupWithLogger(null, this.volumeRef);
       }
     }
+
+    private boolean removeReplicaFromMem() {
+      try (AutoCloseableLock lock = fsdatasetImpl.acquireDatasetLockManager().writeLock(
+          DataNodeLockManager.LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
+        final ReplicaInfo info = fsdatasetImpl.volumeMap
+            .get(block.getBlockPoolId(), block.getLocalBlock());
+        if (info == null) {
+          ReplicaInfo infoByBlockId =
+              fsdatasetImpl.volumeMap.get(block.getBlockPoolId(),
+                  block.getLocalBlock().getBlockId());
+          if (infoByBlockId == null) {
+            // It is okay if the block is not found -- it
+            // may be deleted earlier.
+            LOG.info("Failed to delete replica " + block.getLocalBlock()
+                + ": ReplicaInfo not found in removeReplicaFromMem.");
+          } else {
+            LOG.error("Failed to delete replica " + block.getLocalBlock()
+                + ": GenerationStamp not matched, existing replica is "
+                + Block.toString(infoByBlockId) + " in removeReplicaFromMem.");
+          }
+          return false;
+        }
+
+        FsVolumeImpl v = (FsVolumeImpl)info.getVolume();
+        if (v == null) {
+          LOG.error("Failed to delete replica " + block.getLocalBlock()
+              +  ". No volume for this replica " + info + " in removeReplicaFromMem.");
+          return false;
+        }
+
+        try {
+          File blockFile = new File(info.getBlockURI());
+          if (blockFile != null && blockFile.getParentFile() == null) {
+            LOG.error("Failed to delete replica " + block.getLocalBlock()
+                +  ". Parent not found for block file: " + blockFile
+                + " in removeReplicaFromMem.");
+            return false;
+          }
+        } catch(IllegalArgumentException e) {
+          LOG.warn("Parent directory check failed; replica {} is " +
+              "not backed by a local file in removeReplicaFromMem.", info);
+        }
+
+        if (!this.volume.getStorageID().equals(v.getStorageID())) {
+          LOG.error("Failed to delete replica " + block.getLocalBlock()
+              +  ". Appear different volumes, oldVolume=" + this.volume + " and newVolume=" + v
+              +  " for this replica in removeReplicaFromMem.");

Review Comment:
   Add here



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replia process
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // DN side.
+      DataNode dn = cluster.getDataNode(loc[0].getIpcPort());
+      final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+      List<Block> blockList = com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock());
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+      ds.invalidate(bpid, blockList.toArray(new Block[0]));
+
+      // Test get blocks and datanodes.
+      loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations();
+      assertEquals(3, loc.length);
+      List<String> uuids = com.google.common.collect.Lists.newArrayList();

Review Comment:
   Here too.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java:
##########
@@ -359,6 +371,89 @@ public void run() {
         IOUtils.cleanupWithLogger(null, this.volumeRef);
       }
     }
+
+    private boolean removeReplicaFromMem() {
+      try (AutoCloseableLock lock = fsdatasetImpl.acquireDatasetLockManager().writeLock(
+          DataNodeLockManager.LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
+        final ReplicaInfo info = fsdatasetImpl.volumeMap
+            .get(block.getBlockPoolId(), block.getLocalBlock());
+        if (info == null) {
+          ReplicaInfo infoByBlockId =
+              fsdatasetImpl.volumeMap.get(block.getBlockPoolId(),
+                  block.getLocalBlock().getBlockId());
+          if (infoByBlockId == null) {
+            // It is okay if the block is not found -- it
+            // may be deleted earlier.
+            LOG.info("Failed to delete replica " + block.getLocalBlock()
+                + ": ReplicaInfo not found in removeReplicaFromMem.");
+          } else {
+            LOG.error("Failed to delete replica " + block.getLocalBlock()
+                + ": GenerationStamp not matched, existing replica is "
+                + Block.toString(infoByBlockId) + " in removeReplicaFromMem.");

Review Comment:
   Here too, use `LOG.info("{}")`



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java:
##########
@@ -334,6 +340,12 @@ private boolean moveFiles() {
     @Override
     public void run() {
       try {
+        // For testing. Normally no-op.
+        DataNodeFaultInjector.get().delayDeleteReplica();
+        if (!removeReplicaFromMem()){

Review Comment:
   Can you add one comment to describe it?



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();

Review Comment:
   Change this code to one line, such as:
   ```
   MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
   ```



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java:
##########
@@ -359,6 +371,89 @@ public void run() {
         IOUtils.cleanupWithLogger(null, this.volumeRef);
       }
     }
+
+    private boolean removeReplicaFromMem() {
+      try (AutoCloseableLock lock = fsdatasetImpl.acquireDatasetLockManager().writeLock(
+          DataNodeLockManager.LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
+        final ReplicaInfo info = fsdatasetImpl.volumeMap
+            .get(block.getBlockPoolId(), block.getLocalBlock());
+        if (info == null) {
+          ReplicaInfo infoByBlockId =
+              fsdatasetImpl.volumeMap.get(block.getBlockPoolId(),
+                  block.getLocalBlock().getBlockId());
+          if (infoByBlockId == null) {
+            // It is okay if the block is not found -- it
+            // may be deleted earlier.
+            LOG.info("Failed to delete replica " + block.getLocalBlock()
+                + ": ReplicaInfo not found in removeReplicaFromMem.");
+          } else {
+            LOG.error("Failed to delete replica " + block.getLocalBlock()
+                + ": GenerationStamp not matched, existing replica is "
+                + Block.toString(infoByBlockId) + " in removeReplicaFromMem.");
+          }
+          return false;
+        }
+
+        FsVolumeImpl v = (FsVolumeImpl)info.getVolume();
+        if (v == null) {
+          LOG.error("Failed to delete replica " + block.getLocalBlock()
+              +  ". No volume for this replica " + info + " in removeReplicaFromMem.");
+          return false;
+        }
+
+        try {
+          File blockFile = new File(info.getBlockURI());
+          if (blockFile != null && blockFile.getParentFile() == null) {
+            LOG.error("Failed to delete replica " + block.getLocalBlock()
+                +  ". Parent not found for block file: " + blockFile
+                + " in removeReplicaFromMem.");
+            return false;
+          }
+        } catch(IllegalArgumentException e) {

Review Comment:
   Can change it to `} catch (IllegalArgumentException e) {`



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java:
##########
@@ -359,6 +371,89 @@ public void run() {
         IOUtils.cleanupWithLogger(null, this.volumeRef);
       }
     }
+
+    private boolean removeReplicaFromMem() {
+      try (AutoCloseableLock lock = fsdatasetImpl.acquireDatasetLockManager().writeLock(
+          DataNodeLockManager.LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
+        final ReplicaInfo info = fsdatasetImpl.volumeMap
+            .get(block.getBlockPoolId(), block.getLocalBlock());
+        if (info == null) {
+          ReplicaInfo infoByBlockId =
+              fsdatasetImpl.volumeMap.get(block.getBlockPoolId(),
+                  block.getLocalBlock().getBlockId());
+          if (infoByBlockId == null) {
+            // It is okay if the block is not found -- it
+            // may be deleted earlier.
+            LOG.info("Failed to delete replica " + block.getLocalBlock()
+                + ": ReplicaInfo not found in removeReplicaFromMem.");
+          } else {
+            LOG.error("Failed to delete replica " + block.getLocalBlock()
+                + ": GenerationStamp not matched, existing replica is "
+                + Block.toString(infoByBlockId) + " in removeReplicaFromMem.");
+          }
+          return false;
+        }
+
+        FsVolumeImpl v = (FsVolumeImpl)info.getVolume();
+        if (v == null) {
+          LOG.error("Failed to delete replica " + block.getLocalBlock()
+              +  ". No volume for this replica " + info + " in removeReplicaFromMem.");
+          return false;
+        }
+
+        try {
+          File blockFile = new File(info.getBlockURI());
+          if (blockFile != null && blockFile.getParentFile() == null) {

Review Comment:
   `blockFile != null` always `true`?



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replia process
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // DN side.
+      DataNode dn = cluster.getDataNode(loc[0].getIpcPort());
+      final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+      List<Block> blockList = com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock());

Review Comment:
   Can remove `com.google.common.collect.` and change it to `List<Block> blockList = Lists.newArrayList(extendedBlock.getLocalBlock());`



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java:
##########
@@ -359,6 +371,89 @@ public void run() {
         IOUtils.cleanupWithLogger(null, this.volumeRef);
       }
     }
+
+    private boolean removeReplicaFromMem() {
+      try (AutoCloseableLock lock = fsdatasetImpl.acquireDatasetLockManager().writeLock(
+          DataNodeLockManager.LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
+        final ReplicaInfo info = fsdatasetImpl.volumeMap
+            .get(block.getBlockPoolId(), block.getLocalBlock());
+        if (info == null) {
+          ReplicaInfo infoByBlockId =
+              fsdatasetImpl.volumeMap.get(block.getBlockPoolId(),
+                  block.getLocalBlock().getBlockId());
+          if (infoByBlockId == null) {
+            // It is okay if the block is not found -- it
+            // may be deleted earlier.
+            LOG.info("Failed to delete replica " + block.getLocalBlock()
+                + ": ReplicaInfo not found in removeReplicaFromMem.");
+          } else {
+            LOG.error("Failed to delete replica " + block.getLocalBlock()
+                + ": GenerationStamp not matched, existing replica is "
+                + Block.toString(infoByBlockId) + " in removeReplicaFromMem.");
+          }
+          return false;
+        }
+
+        FsVolumeImpl v = (FsVolumeImpl)info.getVolume();
+        if (v == null) {
+          LOG.error("Failed to delete replica " + block.getLocalBlock()
+              +  ". No volume for this replica " + info + " in removeReplicaFromMem.");
+          return false;
+        }
+
+        try {
+          File blockFile = new File(info.getBlockURI());
+          if (blockFile != null && blockFile.getParentFile() == null) {
+            LOG.error("Failed to delete replica " + block.getLocalBlock()
+                +  ". Parent not found for block file: " + blockFile
+                + " in removeReplicaFromMem.");

Review Comment:
   Here too, use LOG.info("{}")



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replia process

Review Comment:
   Please correct spelling errors.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java:
##########
@@ -359,6 +371,89 @@ public void run() {
         IOUtils.cleanupWithLogger(null, this.volumeRef);
       }
     }
+
+    private boolean removeReplicaFromMem() {
+      try (AutoCloseableLock lock = fsdatasetImpl.acquireDatasetLockManager().writeLock(
+          DataNodeLockManager.LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
+        final ReplicaInfo info = fsdatasetImpl.volumeMap
+            .get(block.getBlockPoolId(), block.getLocalBlock());
+        if (info == null) {
+          ReplicaInfo infoByBlockId =
+              fsdatasetImpl.volumeMap.get(block.getBlockPoolId(),
+                  block.getLocalBlock().getBlockId());
+          if (infoByBlockId == null) {
+            // It is okay if the block is not found -- it
+            // may be deleted earlier.
+            LOG.info("Failed to delete replica " + block.getLocalBlock()
+                + ": ReplicaInfo not found in removeReplicaFromMem.");
+          } else {
+            LOG.error("Failed to delete replica " + block.getLocalBlock()
+                + ": GenerationStamp not matched, existing replica is "
+                + Block.toString(infoByBlockId) + " in removeReplicaFromMem.");
+          }
+          return false;
+        }
+
+        FsVolumeImpl v = (FsVolumeImpl)info.getVolume();
+        if (v == null) {
+          LOG.error("Failed to delete replica " + block.getLocalBlock()
+              +  ". No volume for this replica " + info + " in removeReplicaFromMem.");
+          return false;
+        }
+
+        try {
+          File blockFile = new File(info.getBlockURI());
+          if (blockFile != null && blockFile.getParentFile() == null) {
+            LOG.error("Failed to delete replica " + block.getLocalBlock()
+                +  ". Parent not found for block file: " + blockFile
+                + " in removeReplicaFromMem.");
+            return false;
+          }
+        } catch(IllegalArgumentException e) {
+          LOG.warn("Parent directory check failed; replica {} is " +
+              "not backed by a local file in removeReplicaFromMem.", info);
+        }
+
+        if (!this.volume.getStorageID().equals(v.getStorageID())) {
+          LOG.error("Failed to delete replica " + block.getLocalBlock()
+              +  ". Appear different volumes, oldVolume=" + this.volume + " and newVolume=" + v
+              +  " for this replica in removeReplicaFromMem.");
+          return false;
+        }
+
+        ReplicaInfo removing = fsdatasetImpl.volumeMap.remove(block.getBlockPoolId(),
+            block.getLocalBlock());
+        fsdatasetImpl.addDeletingBlock(block.getBlockPoolId(), removing.getBlockId());
+        LOG.debug("Block file {} is to be deleted", removing.getBlockURI());
+        datanode.getMetrics().incrBlocksRemoved(1);
+        if (removing instanceof ReplicaInPipeline) {
+          ((ReplicaInPipeline) removing).releaseAllBytesReserved();
+        }
+      }
+
+      if (volume.isTransientStorage()) {
+        RamDiskReplicaTracker.RamDiskReplica replicaInfo =
+            fsdatasetImpl.ramDiskReplicaTracker.getReplica(block.getBlockPoolId(),
+                block.getLocalBlock().getBlockId());
+        if (replicaInfo != null) {
+          if (!replicaInfo.getIsPersisted()) {
+            datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
+          }
+          fsdatasetImpl.ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(),
+              replicaInfo.getBlockId(), true);
+        }
+      }
+
+      // If a DFSClient has the replica in its cache of short-circuit file
+      // descriptors (and the client is using ShortCircuitShm), invalidate it.
+      datanode.getShortCircuitRegistry().processBlockInvalidation(
+          new ExtendedBlockId(block.getLocalBlock().getBlockId(), block.getBlockPoolId()));
+
+      // If the block is cached, start uncaching it.
+      fsdatasetImpl.cacheManager.uncacheBlock(
+          block.getBlockPoolId(), block.getLocalBlock().getBlockId());

Review Comment:
   Maybe we can use local variable to store blockPoolId and LocalBlock to clean the code. such as
   ```
   final String blockPoolId = block.getBlockPoolId();
   final Block localBlock = block.getLocalBlock();
   
   fsdatasetImpl.cacheManager.uncacheBlock(blockPoolId, localBlock.getBlockId());
   ```



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()

Review Comment:
   Can you add one description for this method? 
   



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replia process
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // DN side.
+      DataNode dn = cluster.getDataNode(loc[0].getIpcPort());
+      final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+      List<Block> blockList = com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock());
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+      ds.invalidate(bpid, blockList.toArray(new Block[0]));
+
+      // Test get blocks and datanodes.
+      loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations();
+      assertEquals(3, loc.length);
+      List<String> uuids = com.google.common.collect.Lists.newArrayList();
+      for (DatanodeInfo datanodeInfo : loc) {
+        uuids.add(datanodeInfo.getDatanodeUuid());
+      }
+      assertTrue(uuids.contains(dn.getDatanodeUuid()));
+
+      // Do verification that the first replication shouldn't be deleted from the memory first.
+      // Because the namenode still contains this replica, so client will try to read it.
+      // If this replica is deleted from memory, the client would got an ReplicaNotFoundException.
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+
+      // Make it resume the removeReplicaFromMem method
+      semaphore.release(1);
+
+      // Sleep for 1 second so that datanode can complete invalidate.
+      GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {

Review Comment:
   Can change it to lambda, such as:
   ```
   GenericTestUtils.waitFor(() -> ds.asyncDiskService.countPendingDeletions() == 0, 100, 1000);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org