You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2020/12/15 05:58:35 UTC

[hadoop] 01/02: HDFS-15187. CORRUPT replica mismatch between namenodes after failover. Contributed by Ayush Saxena.

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a68ca4a292a430640ba34e46389fc7aaf785c071
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Mon Feb 24 20:38:04 2020 +0530

    HDFS-15187. CORRUPT replica mismatch between namenodes after failover. Contributed by Ayush Saxena.
---
 .../hdfs/server/blockmanagement/BlockManager.java  | 32 ++++++---
 .../TestCorruptionWithFailover.java                | 81 ++++++++++++++++++++++
 2 files changed, 104 insertions(+), 9 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 8cff375..c128666 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3162,6 +3162,7 @@ public class BlockManager implements BlockStatsMXBean {
   
   private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis)
       throws IOException {
+    boolean isPreviousMessageProcessed = true;
     for (ReportedBlockInfo rbi : rbis) {
       LOG.debug("Processing previouly queued message {}", rbi);
       if (rbi.getReportedState() == null) {
@@ -3169,9 +3170,15 @@ public class BlockManager implements BlockStatsMXBean {
         DatanodeStorageInfo storageInfo = rbi.getStorageInfo();
         removeStoredBlock(getStoredBlock(rbi.getBlock()),
             storageInfo.getDatanodeDescriptor());
+      } else if (!isPreviousMessageProcessed) {
+        // if the previous IBR processing was skipped, skip processing all
+        // further IBR's so as to ensure same sequence of processing.
+        queueReportedBlock(rbi.getStorageInfo(), rbi.getBlock(),
+            rbi.getReportedState(), QUEUE_REASON_FUTURE_GENSTAMP);
       } else {
-        processAndHandleReportedBlock(rbi.getStorageInfo(),
-            rbi.getBlock(), rbi.getReportedState(), null);
+        isPreviousMessageProcessed =
+            processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(),
+                rbi.getReportedState(), null);
       }
     }
   }
@@ -4079,8 +4086,14 @@ public class BlockManager implements BlockStatsMXBean {
     processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
   }
-  
-  private void processAndHandleReportedBlock(
+
+  /**
+   * Process a reported block.
+   * @return true if the block is processed, or false if the block is queued
+   * to be processed later.
+   * @throws IOException
+   */
+  private boolean processAndHandleReportedBlock(
       DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
@@ -4094,7 +4107,7 @@ public class BlockManager implements BlockStatsMXBean {
         isGenStampInFuture(block)) {
       queueReportedBlock(storageInfo, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
-      return;
+      return false;
     }
 
     // find block by blockId
@@ -4105,7 +4118,7 @@ public class BlockManager implements BlockStatsMXBean {
       blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
           "belong to any file", block, node, block.getNumBytes());
       addToInvalidates(new Block(block), node);
-      return;
+      return true;
     }
 
     BlockUCState ucState = storedBlock.getBlockUCState();
@@ -4114,7 +4127,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     // Ignore replicas already scheduled to be removed from the DN
     if(invalidateBlocks.contains(node, block)) {
-      return;
+      return true;
     }
 
     BlockToMarkCorrupt c = checkReplicaCorrupt(
@@ -4132,14 +4145,14 @@ public class BlockManager implements BlockStatsMXBean {
       } else {
         markBlockAsCorrupt(c, storageInfo, node);
       }
-      return;
+      return true;
     }
 
     if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
       addStoredBlockUnderConstruction(
           new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
           storageInfo);
-      return;
+      return true;
     }
 
     // Add replica if appropriate. If the replica was previously corrupt
@@ -4149,6 +4162,7 @@ public class BlockManager implements BlockStatsMXBean {
             corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
       addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
     }
+    return true;
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java
new file mode 100644
index 0000000..f5899c0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests corruption of replicas in case of failover.
+ */
+public class TestCorruptionWithFailover {
+
+  @Test
+  public void testCorruptReplicaAfterFailover() throws Exception {
+    Configuration conf = new Configuration();
+    // Enable data to be written, to less replicas in case of pipeline failure.
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        MIN_REPLICATION, 2);
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3)
+        .build()) {
+      cluster.transitionToActive(0);
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem(0);
+      FSDataOutputStream out = dfs.create(new Path("/dir/file"));
+      // Write some data and flush.
+      for (int i = 0; i < 1024 * 1024; i++) {
+        out.write(i);
+      }
+      out.hsync();
+      // Stop one datanode, so as to trigger update pipeline.
+      MiniDFSCluster.DataNodeProperties dn = cluster.stopDataNode(0);
+      // Write some more data and close the file.
+      for (int i = 0; i < 1024 * 1024; i++) {
+        out.write(i);
+      }
+      out.close();
+      BlockManager bm0 = cluster.getNamesystem(0).getBlockManager();
+      BlockManager bm1 = cluster.getNamesystem(1).getBlockManager();
+      // Mark datanodes as stale, as are marked if a namenode went through a
+      // failover, to prevent replica deletion.
+      bm0.getDatanodeManager().markAllDatanodesStale();
+      bm1.getDatanodeManager().markAllDatanodesStale();
+      // Restart the datanode
+      cluster.restartDataNode(dn);
+      // The replica from the datanode will be having lesser genstamp, so
+      // would be marked as CORRUPT.
+      GenericTestUtils.waitFor(() -> bm0.getCorruptBlocks() == 1, 100, 10000);
+
+      // Perform failover to other namenode
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      cluster.waitActive(1);
+      // The corrupt count should be same as first namenode.
+      GenericTestUtils.waitFor(() -> bm1.getCorruptBlocks() == 1, 100, 10000);
+    }
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org