You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2022/06/20 02:55:49 UTC

[hadoop] branch branch-3.2 updated: HDFS-16064. Determine when to invalidate corrupt replicas based on number of usable replicas (#4410)

This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 4dad353597b HDFS-16064. Determine when to invalidate corrupt replicas based on number of usable replicas (#4410)
4dad353597b is described below

commit 4dad353597b799eaa8511b683bcaf569e8ad6acc
Author: KevinWikant <94...@users.noreply.github.com>
AuthorDate: Sun Jun 19 22:20:24 2022 -0400

    HDFS-16064. Determine when to invalidate corrupt replicas based on number of usable replicas (#4410)
    
    Co-authored-by: Kevin Wikant <wi...@amazon.com>
    Signed-off-by: Akira Ajisaka <aa...@apache.org>
    (cherry picked from commit cfceaebde6028f9604421a9ae10dda34bc1f9532)
    
     Conflicts:
            hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
---
 .../hdfs/server/blockmanagement/BlockManager.java  |  18 +-
 .../org/apache/hadoop/hdfs/TestDecommission.java   | 282 +++++++++++++++++++++
 2 files changed, 294 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 9546be16d75..8a2782f955c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1784,23 +1784,29 @@ public class BlockManager implements BlockStatsMXBean {
         b.getReasonCode(), b.getStored().isStriped());
 
     NumberReplicas numberOfReplicas = countNodes(b.getStored());
-    boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
+    final int numUsableReplicas = numberOfReplicas.liveReplicas() +
+        numberOfReplicas.decommissioning() +
+        numberOfReplicas.liveEnteringMaintenanceReplicas();
+    boolean hasEnoughLiveReplicas = numUsableReplicas >=
         expectedRedundancies;
 
     boolean minReplicationSatisfied = hasMinStorage(b.getStored(),
-        numberOfReplicas.liveReplicas());
+        numUsableReplicas);
 
     boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
         (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
         expectedRedundancies;
     boolean corruptedDuringWrite = minReplicationSatisfied &&
         b.isCorruptedDuringWrite();
-    // case 1: have enough number of live replicas
-    // case 2: corrupted replicas + live replicas > Replication factor
+    // case 1: have enough number of usable replicas
+    // case 2: corrupted replicas + usable replicas > Replication factor
     // case 3: Block is marked corrupt due to failure while writing. In this
     //         case genstamp will be different than that of valid block.
     // In all these cases we can delete the replica.
-    // In case of 3, rbw block will be deleted and valid block can be replicated
+    // In case 3, rbw block will be deleted and valid block can be replicated.
+    // Note NN only becomes aware of corrupt blocks when the block report is sent,
+    // this means that by default it can take up to 6 hours for a corrupt block to
+    // be invalidated, after which the valid block can be replicated.
     if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
         || corruptedDuringWrite) {
       if (b.getStored().isStriped()) {
@@ -3491,7 +3497,7 @@ public class BlockManager implements BlockStatsMXBean {
           ". blockMap has {} but corrupt replicas map has {}",
           storedBlock, numCorruptNodes, corruptReplicasCount);
     }
-    if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileRedundancy)) {
+    if ((corruptReplicasCount > 0) && (numUsableReplicas >= fileRedundancy)) {
       invalidateCorruptReplicas(storedBlock, reportedBlock, num);
     }
     return storedBlock;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index 39b93f281a4..dfa3d77824d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -1818,4 +1819,285 @@ public class TestDecommission extends AdminStatesBaseTest {
         !BlockManagerTestUtil.isNodeHealthyForDecommissionOrMaintenance(blockManager, node)
             && !node.isAlive()), 500, 20000);
   }
+
+  /*
+  This test reproduces a scenario where an under-replicated block on a decommissioning node
+  cannot be replicated to some datanodes because they have a corrupt replica of the block.
+  The test ensures that the corrupt replicas are eventually invalidated so that the
+  under-replicated block can be replicated to sufficient datanodes & the decommissioning
+  node can be decommissioned.
+   */
+  @Test(timeout = 60000)
+  public void testDeleteCorruptReplicaForUnderReplicatedBlock() throws Exception {
+    // Constants
+    final Path file = new Path("/test-file");
+    final int numDatanode = 3;
+    final short replicationFactor = 2;
+    final int numStoppedNodes = 2;
+    final int numDecommNodes = 1;
+    assertEquals(numDatanode, numStoppedNodes + numDecommNodes);
+
+    // Run monitor every 5 seconds to speed up decommissioning & make the test faster
+    final int datanodeAdminMonitorFixedRateSeconds = 5;
+    getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
+        datanodeAdminMonitorFixedRateSeconds);
+    // Set block report interval to 6 hours to avoid unexpected block reports.
+    // The default block report interval is different for a MiniDFSCluster
+    getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
+        DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    // Run the BlockManager RedundancyMonitor every 3 seconds such that the Namenode
+    // sends under-replication blocks for replication frequently
+    getConf().setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+        DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT);
+    // Ensure that the DataStreamer client will replace the bad datanode on append failure
+    getConf().set(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, "ALWAYS");
+    // Avoid having the DataStreamer client fail the append operation if datanode replacement fails
+    getConf()
+        .setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);
+
+    // References to datanodes in the cluster
+    // - 2 datanode will be stopped to generate corrupt block replicas & then
+    //     restarted later to validate the corrupt replicas are invalidated
+    // - 1 datanode will start decommissioning to make the block under replicated
+    final List<DatanodeDescriptor> allNodes = new ArrayList<>();
+    final List<DatanodeDescriptor> stoppedNodes = new ArrayList<>();
+    final DatanodeDescriptor decommNode;
+
+    // Create MiniDFSCluster
+    startCluster(1, numDatanode);
+    getCluster().waitActive();
+    final FSNamesystem namesystem = getCluster().getNamesystem();
+    final BlockManager blockManager = namesystem.getBlockManager();
+    final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
+    final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager();
+    final FileSystem fs = getCluster().getFileSystem();
+
+    // Get DatanodeDescriptors
+    for (final DataNode node : getCluster().getDataNodes()) {
+      allNodes.add(getDatanodeDesriptor(namesystem, node.getDatanodeUuid()));
+    }
+
+    // Create block with 2 FINALIZED replicas
+    // Note that:
+    // - calling hflush leaves block in state ReplicaBeingWritten
+    // - calling close leaves the block in state FINALIZED
+    // - amount of data is kept small because flush is not synchronous
+    LOG.info("Creating Initial Block with {} FINALIZED replicas", replicationFactor);
+    FSDataOutputStream out = fs.create(file, replicationFactor);
+    for (int i = 0; i < 512; i++) {
+      out.write(i);
+    }
+    out.close();
+
+    // Validate the block exists with expected number of replicas
+    assertEquals(1, blockManager.getTotalBlocks());
+    BlockLocation[] blocksInFile = fs.getFileBlockLocations(file, 0, 0);
+    assertEquals(1, blocksInFile.length);
+    List<String> replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
+    assertEquals(replicationFactor, replicasInBlock.size());
+
+    // Identify the DatanodeDescriptors associated with the 2 nodes with replicas.
+    // Each of nodes with a replica will be stopped later to corrupt the replica
+    DatanodeDescriptor decommNodeTmp = null;
+    for (DatanodeDescriptor node : allNodes) {
+      if (replicasInBlock.contains(node.getName())) {
+        stoppedNodes.add(node);
+      } else {
+        decommNodeTmp = node;
+      }
+    }
+    assertEquals(numStoppedNodes, stoppedNodes.size());
+    assertNotNull(decommNodeTmp);
+    decommNode = decommNodeTmp;
+    final DatanodeDescriptor firstStoppedNode = stoppedNodes.get(0);
+    final DatanodeDescriptor secondStoppedNode = stoppedNodes.get(1);
+    LOG.info("Detected 2 nodes with replicas : {} , {}", firstStoppedNode.getXferAddr(),
+        secondStoppedNode.getXferAddr());
+    LOG.info("Detected 1 node without replica : {}", decommNode.getXferAddr());
+
+    // Stop firstStoppedNode & the append to the block pipeline such that DataStreamer client:
+    // - detects firstStoppedNode as bad link in block pipeline
+    // - replaces the firstStoppedNode with decommNode in block pipeline
+    // The result is that:
+    // - secondStoppedNode & decommNode have a live block replica
+    // - firstStoppedNode has a corrupt replica (corrupt because of old GenStamp)
+    LOG.info("Stopping first node with replica {}", firstStoppedNode.getXferAddr());
+    final List<MiniDFSCluster.DataNodeProperties> stoppedNodeProps = new ArrayList<>();
+    MiniDFSCluster.DataNodeProperties stoppedNodeProp =
+        getCluster().stopDataNode(firstStoppedNode.getXferAddr());
+    stoppedNodeProps.add(stoppedNodeProp);
+    firstStoppedNode.setLastUpdate(213); // Set last heartbeat to be in the past
+    // Wait for NN to detect the datanode as dead
+    GenericTestUtils.waitFor(
+        () -> 2 == datanodeManager.getNumLiveDataNodes() && 1 == datanodeManager
+            .getNumDeadDataNodes(), 500, 30000);
+    // Append to block pipeline
+    appendBlock(fs, file, 2);
+
+    // Stop secondStoppedNode & the append to the block pipeline such that DataStreamer client:
+    // - detects secondStoppedNode as bad link in block pipeline
+    // - attempts to replace secondStoppedNode but cannot because there are no more live nodes
+    // - appends to the block pipeline containing just decommNode
+    // The result is that:
+    // - decommNode has a live block replica
+    // - firstStoppedNode & secondStoppedNode both have a corrupt replica
+    LOG.info("Stopping second node with replica {}", secondStoppedNode.getXferAddr());
+    stoppedNodeProp = getCluster().stopDataNode(secondStoppedNode.getXferAddr());
+    stoppedNodeProps.add(stoppedNodeProp);
+    secondStoppedNode.setLastUpdate(213); // Set last heartbeat to be in the past
+    // Wait for NN to detect the datanode as dead
+    GenericTestUtils.waitFor(() -> numDecommNodes == datanodeManager.getNumLiveDataNodes()
+        && numStoppedNodes == datanodeManager.getNumDeadDataNodes(), 500, 30000);
+    // Append to block pipeline
+    appendBlock(fs, file, 1);
+
+    // Validate block replica locations
+    blocksInFile = fs.getFileBlockLocations(file, 0, 0);
+    assertEquals(1, blocksInFile.length);
+    replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
+    assertEquals(numDecommNodes, replicasInBlock.size());
+    assertTrue(replicasInBlock.contains(decommNode.getName()));
+    LOG.info("Block now has 2 corrupt replicas on [{} , {}] and 1 live replica on {}",
+        firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr());
+
+    LOG.info("Decommission node {} with the live replica", decommNode.getXferAddr());
+    final ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<>();
+    takeNodeOutofService(0, decommNode.getDatanodeUuid(), 0, decommissionedNodes,
+        AdminStates.DECOMMISSION_INPROGRESS);
+
+    // Wait for the datanode to start decommissioning
+    try {
+      GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0
+          && decomManager.getNumPendingNodes() == numDecommNodes && decommNode.getAdminState()
+          .equals(AdminStates.DECOMMISSION_INPROGRESS), 500, 30000);
+    } catch (Exception e) {
+      blocksInFile = fs.getFileBlockLocations(file, 0, 0);
+      assertEquals(1, blocksInFile.length);
+      replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
+      String errMsg = String.format("Node %s failed to start decommissioning."
+              + " numTrackedNodes=%d , numPendingNodes=%d , adminState=%s , nodesWithReplica=[%s]",
+          decommNode.getXferAddr(), decomManager.getNumTrackedNodes(),
+          decomManager.getNumPendingNodes(), decommNode.getAdminState(),
+          String.join(", ", replicasInBlock));
+      LOG.error(errMsg); // Do not log generic timeout exception
+      fail(errMsg);
+    }
+
+    // Validate block replica locations
+    blocksInFile = fs.getFileBlockLocations(file, 0, 0);
+    assertEquals(1, blocksInFile.length);
+    replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
+    assertEquals(numDecommNodes, replicasInBlock.size());
+    assertEquals(replicasInBlock.get(0), decommNode.getName());
+    LOG.info("Block now has 2 corrupt replicas on [{} , {}] and 1 decommissioning replica on {}",
+        firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr());
+
+    // Restart the 2 stopped datanodes
+    LOG.info("Restarting stopped nodes {} , {}", firstStoppedNode.getXferAddr(),
+        secondStoppedNode.getXferAddr());
+    for (final MiniDFSCluster.DataNodeProperties stoppedNode : stoppedNodeProps) {
+      assertTrue(getCluster().restartDataNode(stoppedNode));
+    }
+    for (final MiniDFSCluster.DataNodeProperties stoppedNode : stoppedNodeProps) {
+      try {
+        getCluster().waitDatanodeFullyStarted(stoppedNode.getDatanode(), 30000);
+        LOG.info("Node {} Restarted", stoppedNode.getDatanode().getXferAddress());
+      } catch (Exception e) {
+        String errMsg = String.format("Node %s Failed to Restart within 30 seconds",
+            stoppedNode.getDatanode().getXferAddress());
+        LOG.error(errMsg); // Do not log generic timeout exception
+        fail(errMsg);
+      }
+    }
+
+    // Trigger block reports for the 2 restarted nodes to ensure their corrupt
+    // block replicas are identified by the namenode
+    for (MiniDFSCluster.DataNodeProperties dnProps : stoppedNodeProps) {
+      DataNodeTestUtils.triggerBlockReport(dnProps.getDatanode());
+    }
+
+    // Validate the datanode is eventually decommissioned
+    // Some changes are needed to ensure replication/decommissioning occur in a timely manner:
+    // - if the namenode sends a DNA_TRANSFER before sending the DNA_INVALIDATE's then:
+    //   - the block will enter the pendingReconstruction queue
+    //   - this prevent the block from being sent for transfer again for some time
+    // - solution is to call "clearQueues" so that DNA_TRANSFER is sent again after DNA_INVALIDATE
+    // - need to run the check less frequently than DatanodeAdminMonitor
+    //       such that in between "clearQueues" calls 2 things can occur:
+    //   - DatanodeAdminMonitor runs which sets the block as neededReplication
+    //   - datanode heartbeat is received which sends the DNA_TRANSFER to the node
+    final int checkEveryMillis = datanodeAdminMonitorFixedRateSeconds * 2 * 1000;
+    try {
+      GenericTestUtils.waitFor(() -> {
+        blockManager.clearQueues(); // Clear pendingReconstruction queue
+        return decomManager.getNumTrackedNodes() == 0 && decomManager.getNumPendingNodes() == 0
+            && decommNode.getAdminState().equals(AdminStates.DECOMMISSIONED);
+      }, checkEveryMillis, 40000);
+    } catch (Exception e) {
+      blocksInFile = fs.getFileBlockLocations(file, 0, 0);
+      assertEquals(1, blocksInFile.length);
+      replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
+      String errMsg = String.format("Node %s failed to complete decommissioning."
+              + " numTrackedNodes=%d , numPendingNodes=%d , adminState=%s , nodesWithReplica=[%s]",
+          decommNode.getXferAddr(), decomManager.getNumTrackedNodes(),
+          decomManager.getNumPendingNodes(), decommNode.getAdminState(),
+          String.join(", ", replicasInBlock));
+      LOG.error(errMsg); // Do not log generic timeout exception
+      fail(errMsg);
+    }
+
+    // Validate block replica locations.
+    // Note that in order for decommissioning to complete the block must be
+    // replicated to both of the restarted datanodes; this implies that the
+    // corrupt replicas were invalidated on both of the restarted datanodes.
+    blocksInFile = fs.getFileBlockLocations(file, 0, 0);
+    assertEquals(1, blocksInFile.length);
+    replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
+    assertEquals(numDatanode, replicasInBlock.size());
+    assertTrue(replicasInBlock.contains(decommNode.getName()));
+    for (final DatanodeDescriptor node : stoppedNodes) {
+      assertTrue(replicasInBlock.contains(node.getName()));
+    }
+    LOG.info("Block now has 2 live replicas on [{} , {}] and 1 decommissioned replica on {}",
+        firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr());
+  }
+
+  void appendBlock(final FileSystem fs, final Path file, int expectedReplicas) throws IOException {
+    LOG.info("Appending to the block pipeline");
+    boolean failed = false;
+    Exception failedReason = null;
+    try {
+      FSDataOutputStream out = fs.append(file);
+      for (int i = 0; i < 512; i++) {
+        out.write(i);
+      }
+      out.close();
+    } catch (Exception e) {
+      failed = true;
+      failedReason = e;
+    } finally {
+      BlockLocation[] blocksInFile = fs.getFileBlockLocations(file, 0, 0);
+      assertEquals(1, blocksInFile.length);
+      List<String> replicasInBlock = Arrays.asList(blocksInFile[0].getNames());
+      if (failed) {
+        String errMsg = String.format(
+            "Unexpected exception appending to the block pipeline."
+                + " nodesWithReplica=[%s]", String.join(", ", replicasInBlock));
+        LOG.error(errMsg, failedReason); // Do not swallow the exception
+        fail(errMsg);
+      } else if (expectedReplicas != replicasInBlock.size()) {
+        String errMsg = String.format("Expecting %d replicas in block pipeline,"
+                + " unexpectedly found %d replicas. nodesWithReplica=[%s]", expectedReplicas,
+            replicasInBlock.size(), String.join(", ", replicasInBlock));
+        LOG.error(errMsg);
+        fail(errMsg);
+      } else {
+        String infoMsg = String.format(
+            "Successfully appended block pipeline with %d replicas."
+                + " nodesWithReplica=[%s]",
+            replicasInBlock.size(), String.join(", ", replicasInBlock));
+        LOG.info(infoMsg);
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org