You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "hfutatzhanghb (via GitHub)" <gi...@apache.org> on 2023/06/07 02:45:19 UTC

[GitHub] [hadoop] hfutatzhanghb commented on a diff in pull request #5643: HDFS-17003. Erasure coding: invalidate wrong block after reporting bad blocks from datanode

hfutatzhanghb commented on code in PR #5643:
URL: https://github.com/apache/hadoop/pull/5643#discussion_r1220738668


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java:
##########
@@ -169,6 +171,108 @@ public void testInvalidateBlock() throws IOException, InterruptedException {
     }
   }
 
+  @Test
+  public void testCorruptionECBlockInvalidate() throws Exception {
+
+    final Path file = new Path("/invalidate_corrupted");
+    final int length = BLOCK_SIZE * NUM_DATA_UNITS;
+    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
+    DFSTestUtil.writeFile(dfs, file, bytes);
+
+    int dnIndex = findFirstDataNode(cluster, dfs, file,
+        CELL_SIZE * NUM_DATA_UNITS);
+    int dnIndex2 = findDataNodeAtIndex(cluster, dfs, file,
+        CELL_SIZE * NUM_DATA_UNITS, 2);
+    Assert.assertNotEquals(-1, dnIndex);
+    Assert.assertNotEquals(-1, dnIndex2);
+
+    LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
+        .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
+        .get(0);
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
+        CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
+
+    final Block b = blks[0].getBlock().getLocalBlock();
+    final Block b2 = blks[1].getBlock().getLocalBlock();
+
+    // find the first block file
+    File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
+    File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
+    Assert.assertTrue("Block file does not exist", blkFile.exists());
+    // corrupt the block file
+    LOG.info("Deliberately corrupting file " + blkFile.getName());
+    try (FileOutputStream out = new FileOutputStream(blkFile)) {
+      out.write("corruption".getBytes());
+      out.flush();
+    }
+
+    // find the second block file
+    File storageDir2 = cluster.getInstanceStorageDir(dnIndex2, 0);
+    File blkFile2 = MiniDFSCluster.getBlockFile(storageDir2, blks[1].getBlock());
+    Assert.assertTrue("Block file does not exist", blkFile2.exists());
+    // corrupt the second block file
+    LOG.info("Deliberately corrupting file " + blkFile2.getName());
+    try (FileOutputStream out = new FileOutputStream(blkFile2)) {
+      out.write("corruption".getBytes());
+      out.flush();
+    }
+
+    // disable the heartbeat from DN so that the corrupted block record is kept
+    // in NameNode
+    for (DataNode dataNode : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
+    }
+    try {
+      // do stateful read
+      StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
+          ByteBuffer.allocate(1024));
+
+      // check whether the corruption has been reported to the NameNode
+      final FSNamesystem ns = cluster.getNamesystem();
+      final BlockManager bm = ns.getBlockManager();
+      BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
+          .asFile().getBlocks())[0];
+      GenericTestUtils.waitFor(() -> {
+        if (bm.getCorruptReplicas(blockInfo) == null) {
+          return false;
+        }
+        return bm.getCorruptReplicas(blockInfo).size() == 2;
+      }, 250, 60000);
+      // double check
+      Assert.assertEquals(2, bm.getCorruptReplicas(blockInfo).size());
+
+      DatanodeDescriptor dnd =
+          NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex).getDatanodeId());
+
+      DatanodeDescriptor dnd2 =
+          NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex2).getDatanodeId());
+
+      for (DataNode datanode : cluster.getDataNodes()) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false);

Review Comment:
   fixed



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java:
##########
@@ -169,6 +171,129 @@ public void testInvalidateBlock() throws IOException, InterruptedException {
     }
   }
 
+  /**
+   * This unit test try to cover the below situation:
+   * Suppose we have an EC file with RS(d,p) policy and block group id
+   * is blk_-9223372036845119810_1920002.
+   * If the first and second data block in this ec block group are corrupted,
+   * meanwhile we read this EC file.
+   * It will trigger reportBadBlock RPC and
+   * add the blk_-9223372036845119810_1920002
+   * and blk_-9223372036845119809_1920002 blocks to corruptReplicas.
+   * It will also reconstruct the two blocks and send IBR to namenode,
+   * then execute BlockManager#addStoredBlock and
+   * invalidateCorruptReplicas method. Suppose we first receive the IBR of
+   * blk_-9223372036845119810_1920002, then in invalidateCorruptReplicas method,
+   * it will only invalidate 9223372036845119809_1920002 on the two datanodes contains
+   * the two corrupt blocks.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testCorruptionECBlockInvalidate() throws Exception {
+
+    final Path file = new Path("/invalidate_corrupted");
+    final int length = BLOCK_SIZE * NUM_DATA_UNITS;
+    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
+    DFSTestUtil.writeFile(dfs, file, bytes);
+
+    int dnIndex = findFirstDataNode(cluster, dfs, file,
+        CELL_SIZE * NUM_DATA_UNITS);
+    int dnIndex2 = findDataNodeAtIndex(cluster, dfs, file,
+        CELL_SIZE * NUM_DATA_UNITS, 2);
+    Assert.assertNotEquals(-1, dnIndex);
+    Assert.assertNotEquals(-1, dnIndex2);
+
+    LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
+        .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
+        .get(0);
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
+        CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
+
+    final Block b = blks[0].getBlock().getLocalBlock();
+    final Block b2 = blks[1].getBlock().getLocalBlock();
+
+    // Find the first block file.
+    File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
+    File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
+    Assert.assertTrue("Block file does not exist", blkFile.exists());
+    // Corrupt the block file.
+    LOG.info("Deliberately corrupting file " + blkFile.getName());
+    try (FileOutputStream out = new FileOutputStream(blkFile)) {
+      out.write("corruption".getBytes());
+      out.flush();
+    }
+
+    // Find the second block file.
+    File storageDir2 = cluster.getInstanceStorageDir(dnIndex2, 0);
+    File blkFile2 = MiniDFSCluster.getBlockFile(storageDir2, blks[1].getBlock());
+    Assert.assertTrue("Block file does not exist", blkFile2.exists());
+    // Corrupt the second block file.
+    LOG.info("Deliberately corrupting file " + blkFile2.getName());
+    try (FileOutputStream out = new FileOutputStream(blkFile2)) {
+      out.write("corruption".getBytes());
+      out.flush();
+    }
+
+    // Disable the heartbeat from DN so that the corrupted block record is kept
+    // in NameNode.
+    for (DataNode dataNode : cluster.getDataNodes()) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
+    }
+    try {
+      // Do stateful read.
+      StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
+          ByteBuffer.allocate(1024));
+
+      // Check whether the corruption has been reported to the NameNode.
+      final FSNamesystem ns = cluster.getNamesystem();
+      final BlockManager bm = ns.getBlockManager();
+      BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
+          .asFile().getBlocks())[0];
+      GenericTestUtils.waitFor(() -> {
+        if (bm.getCorruptReplicas(blockInfo) == null) {
+          return false;
+        }
+        return bm.getCorruptReplicas(blockInfo).size() == 2;
+      }, 250, 60000);
+      // Double check.
+      Assert.assertEquals(2, bm.getCorruptReplicas(blockInfo).size());
+
+      DatanodeDescriptor dnd =
+          NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex).getDatanodeId());
+
+      DatanodeDescriptor dnd2 =
+          NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex2).getDatanodeId());
+
+      for (DataNode datanode : cluster.getDataNodes()) {
+        if (!datanode.getDatanodeUuid().equals(dnd.getDatanodeUuid()) ||

Review Comment:
   @zhangshuyan0 shuyan, I think we should use || here, because when call getBlocksToInvalidateByLimit method, it will execute `blockSet.pollN(limit)` which will remove the block from nodeToECBlocks and these block will be added to DatanodeDescriptor object's invalidateBlocks field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org