You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/09/20 04:19:29 UTC

[GitHub] [hadoop] Hexiaoqiao commented on a diff in pull request #4903: HDFS-16774.Improve async delete replica on datanode

Hexiaoqiao commented on code in PR #4903:
URL: https://github.com/apache/hadoop/pull/4903#discussion_r974869052


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replia process
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // DN side.
+      DataNode dn = cluster.getDataNode(loc[0].getIpcPort());
+      final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+      List<Block> blockList = com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock());

Review Comment:
   +1



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replia process
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // DN side.
+      DataNode dn = cluster.getDataNode(loc[0].getIpcPort());
+      final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+      List<Block> blockList = com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock());
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+      ds.invalidate(bpid, blockList.toArray(new Block[0]));
+
+      // Test get blocks and datanodes.
+      loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations();
+      assertEquals(3, loc.length);
+      List<String> uuids = com.google.common.collect.Lists.newArrayList();
+      for (DatanodeInfo datanodeInfo : loc) {
+        uuids.add(datanodeInfo.getDatanodeUuid());
+      }
+      assertTrue(uuids.contains(dn.getDatanodeUuid()));
+
+      // Do verification that the first replication shouldn't be deleted from the memory first.
+      // Because the namenode still contains this replica, so client will try to read it.
+      // If this replica is deleted from memory, the client would got an ReplicaNotFoundException.
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+
+      // Make it resume the removeReplicaFromMem method
+      semaphore.release(1);
+
+      // Sleep for 1 second so that datanode can complete invalidate.
+      GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {
+        @Override public Boolean get() {
+          return ds.asyncDiskService.countPendingDeletions() == 0;
+        }
+      }, 100, 1000);
+
+      // Sleep for two heartbeat times (default a heartbeat interval is 3 second).
+      Thread.sleep(6000);
+
+      // Test get blocks and datanodes again.
+      loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations();
+      assertEquals(2, loc.length);
+      uuids = com.google.common.collect.Lists.newArrayList();

Review Comment:
   Same as above comment.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java:
##########
@@ -359,6 +371,89 @@ public void run() {
         IOUtils.cleanupWithLogger(null, this.volumeRef);
       }
     }
+
+    private boolean removeReplicaFromMem() {

Review Comment:
   IMO, it is more proper to move this method to class `FsDatasetImpl`, what do you think about?



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java:
##########
@@ -1830,4 +1833,92 @@ public void testTransferAndNativeCopyMetrics() throws IOException {
       assertEquals(3, metrics.getNativeCopyIoQuantiles().length);
     }
   }
+
+  @Test
+  public void testAysncDiskServiceDeleteReplica()
+      throws IOException, InterruptedException, TimeoutException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    // Bump up replication interval.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      cluster.waitActive();
+      final DataNodeFaultInjector injector = new DataNodeFaultInjector() {
+        @Override
+        public void delayDeleteReplica() {
+          // Lets wait for the remove replia process
+          try {
+            semaphore.acquire(1);
+          } catch (InterruptedException e) {
+            // ignore
+          }
+        }
+      };
+      DataNodeFaultInjector.set(injector);
+
+      // Create file.
+      Path path = new Path("/testfile");
+      DFSTestUtil.createFile(fs, path, 1024, (short) 3, 0);
+      DFSTestUtil.waitReplication(fs, path, (short) 3);
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
+      ExtendedBlock extendedBlock = lb.getBlock();
+      DatanodeInfo[] loc = lb.getLocations();
+      assertEquals(3, loc.length);
+
+      // DN side.
+      DataNode dn = cluster.getDataNode(loc[0].getIpcPort());
+      final FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+      List<Block> blockList = com.google.common.collect.Lists.newArrayList(extendedBlock.getLocalBlock());
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+      ds.invalidate(bpid, blockList.toArray(new Block[0]));
+
+      // Test get blocks and datanodes.
+      loc = DFSTestUtil.getAllBlocks(fs, path).get(0).getLocations();
+      assertEquals(3, loc.length);
+      List<String> uuids = com.google.common.collect.Lists.newArrayList();
+      for (DatanodeInfo datanodeInfo : loc) {
+        uuids.add(datanodeInfo.getDatanodeUuid());
+      }
+      assertTrue(uuids.contains(dn.getDatanodeUuid()));
+
+      // Do verification that the first replication shouldn't be deleted from the memory first.
+      // Because the namenode still contains this replica, so client will try to read it.
+      // If this replica is deleted from memory, the client would got an ReplicaNotFoundException.
+      assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
+
+      // Make it resume the removeReplicaFromMem method
+      semaphore.release(1);
+
+      // Sleep for 1 second so that datanode can complete invalidate.
+      GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {
+        @Override public Boolean get() {
+          return ds.asyncDiskService.countPendingDeletions() == 0;
+        }
+      }, 100, 1000);
+
+      // Sleep for two heartbeat times (default a heartbeat interval is 3 second).
+      Thread.sleep(6000);

Review Comment:
   Is it possible to meet some unexpected case if the default heartbeat interval changes?  I think we could get heartbeat config item here.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java:
##########
@@ -2310,10 +2310,10 @@ private void invalidate(String bpid, Block[] invalidBlks, boolean async)
       throws IOException {
     final List<String> errors = new ArrayList<String>();
     for (int i = 0; i < invalidBlks.length; i++) {
-      final ReplicaInfo removing;
+      final ReplicaInfo info;

Review Comment:
   Is it possible to move segment L2313~L2349 to async also? Because it includes some IO request here. # this is not blocker issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org