You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2020/12/15 05:58:37 UTC

[hadoop] branch branch-3.2 updated (ed78df0 -> 079d3ea)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from ed78df0  HDFS-15170. EC: Block gets marked as CORRUPT in case of failover and pipeline recovery. Contributed by Ayush Saxena.
     new 40285be  HDFS-15187. CORRUPT replica mismatch between namenodes after failover. Contributed by Ayush Saxena.
     new 079d3ea  HDFS-15200. Delete Corrupt Replica Immediately Irrespective of Replicas On Stale Storage. Contributed by Ayush Saxena.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  7 ++
 .../hdfs/server/blockmanagement/BlockManager.java  | 43 ++++++++++---
 .../src/main/resources/hdfs-default.xml            |  9 +++
 .../server/blockmanagement/TestBlockManager.java   | 38 ++++++++++-
 ...uption.java => TestCorruptionWithFailover.java} | 74 ++++++++++------------
 .../hadoop/hdfs/server/namenode/TestFsck.java      |  3 +
 6 files changed, 123 insertions(+), 51 deletions(-)
 copy hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/{TestErasureCodingCorruption.java => TestCorruptionWithFailover.java} (53%)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HDFS-15200. Delete Corrupt Replica Immediately Irrespective of Replicas On Stale Storage. Contributed by Ayush Saxena.

Posted by ay...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 079d3ea6ac0da8e1b7548a1ea0bb18b0bba95776
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Tue Dec 15 10:47:48 2020 +0530

    HDFS-15200. Delete Corrupt Replica Immediately Irrespective of Replicas On Stale Storage. Contributed by Ayush Saxena.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  7 ++++
 .../hdfs/server/blockmanagement/BlockManager.java  | 11 ++++++-
 .../src/main/resources/hdfs-default.xml            |  9 +++++
 .../server/blockmanagement/TestBlockManager.java   | 38 +++++++++++++++++++++-
 .../TestCorruptionWithFailover.java                |  4 +++
 .../hadoop/hdfs/server/namenode/TestFsck.java      |  3 ++
 6 files changed, 70 insertions(+), 2 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 3164e42..5eee8df 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -275,6 +275,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       = "dfs.namenode.blockreport.queue.size";
   public static final int    DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT
       = 1024;
+
+  public static final String
+      DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED =
+      "dfs.namenode.corrupt.block.delete.immediately.enabled";
+  public static final boolean
+      DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT = true;
+
   public static final String  DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter";
   /* Phrased as below to avoid javac inlining as a constant, to match the behavior when
      this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index c8d8598..e4bf792 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -455,6 +455,11 @@ public class BlockManager implements BlockStatsMXBean {
    * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
    */
   private final short minReplicationToBeInMaintenance;
+  /**
+   * Whether to delete corrupt replica immediately irrespective of other
+   * replicas available on stale storages.
+   */
+  private final boolean deleteCorruptReplicaImmediately;
 
   /** Storages accessible from multiple DNs. */
   private final ProvidedStorageMap providedStorageMap;
@@ -607,6 +612,10 @@ public class BlockManager implements BlockStatsMXBean {
         DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
     blockReportThread = new BlockReportProcessingThread(queueSize);
 
+    this.deleteCorruptReplicaImmediately =
+        conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
+            DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);
+
     LOG.info("defaultReplication         = {}", defaultReplication);
     LOG.info("maxReplication             = {}", maxReplication);
     LOG.info("minReplication             = {}", minReplication);
@@ -1864,7 +1873,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // Check how many copies we have of the block
-    if (nr.replicasOnStaleNodes() > 0) {
+    if (nr.replicasOnStaleNodes() > 0 && !deleteCorruptReplicaImmediately) {
       blockLog.debug("BLOCK* invalidateBlocks: postponing " +
           "invalidation of {} on {} because {} replica(s) are located on " +
           "nodes with potentially out-of-date block reports", b, dn,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 8375cff..09c4f54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5427,6 +5427,15 @@
   </property>
 
   <property>
+    <name>dfs.namenode.corrupt.block.delete.immediately.enabled</name>
+    <value>true</value>
+    <description>
+      Whether the corrupt replicas should be deleted immediately, irrespective
+      of other replicas on stale storages..
+    </description>
+  </property>
+
+  <property>
     <name>dfs.journalnode.edits.dir.perm</name>
     <value>700</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index fea80e5..c333226 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -21,7 +21,9 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
@@ -501,7 +503,41 @@ public class TestBlockManager {
       }
     }
   }
-  
+
+  @Test(timeout = 60000)
+  public void testDeleteCorruptReplicaWithStatleStorages() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        MIN_REPLICATION, 2);
+    Path file = new Path("/test-file");
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    try {
+      cluster.waitActive();
+      BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+      blockManager.getDatanodeManager().markAllDatanodesStale();
+      FileSystem fs = cluster.getFileSystem();
+      FSDataOutputStream out = fs.create(file);
+      for (int i = 0; i < 1024 * 1024 * 1; i++) {
+        out.write(i);
+      }
+      out.hflush();
+      MiniDFSCluster.DataNodeProperties datanode = cluster.stopDataNode(0);
+      for (int i = 0; i < 1024 * 1024 * 1; i++) {
+        out.write(i);
+      }
+      out.close();
+      cluster.restartDataNode(datanode);
+      cluster.triggerBlockReports();
+      DataNodeTestUtils.triggerBlockReport(datanode.getDatanode());
+      assertEquals(0, blockManager.getCorruptBlocks());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   /**
    * Tell the block manager that replication is completed for the given
    * pipeline.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java
index f5899c0..06e4f60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED;
+
 /**
  * Tests corruption of replicas in case of failover.
  */
@@ -35,6 +37,8 @@ public class TestCorruptionWithFailover {
   @Test
   public void testCorruptReplicaAfterFailover() throws Exception {
     Configuration conf = new Configuration();
+    conf.setBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
+        false);
     // Enable data to be written, to less replicas in case of pipeline failure.
     conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
         MIN_REPLICATION, 2);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index 7d2a45a..09561c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED;
 import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -188,6 +189,8 @@ public class TestFsck {
   @Before
   public void setUp() throws Exception {
     conf = new Configuration();
+    conf.setBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
+        false);
   }
 
   @After


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: HDFS-15187. CORRUPT replica mismatch between namenodes after failover. Contributed by Ayush Saxena.

Posted by ay...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 40285be1d54d8a50c13347abed4501282df838c0
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Mon Feb 24 20:38:04 2020 +0530

    HDFS-15187. CORRUPT replica mismatch between namenodes after failover. Contributed by Ayush Saxena.
---
 .../hdfs/server/blockmanagement/BlockManager.java  | 32 ++++++---
 .../TestCorruptionWithFailover.java                | 81 ++++++++++++++++++++++
 2 files changed, 104 insertions(+), 9 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 6bfb2c6..c8d8598 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3175,6 +3175,7 @@ public class BlockManager implements BlockStatsMXBean {
   
   private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis)
       throws IOException {
+    boolean isPreviousMessageProcessed = true;
     for (ReportedBlockInfo rbi : rbis) {
       LOG.debug("Processing previouly queued message {}", rbi);
       if (rbi.getReportedState() == null) {
@@ -3182,9 +3183,15 @@ public class BlockManager implements BlockStatsMXBean {
         DatanodeStorageInfo storageInfo = rbi.getStorageInfo();
         removeStoredBlock(getStoredBlock(rbi.getBlock()),
             storageInfo.getDatanodeDescriptor());
+      } else if (!isPreviousMessageProcessed) {
+        // if the previous IBR processing was skipped, skip processing all
+        // further IBR's so as to ensure same sequence of processing.
+        queueReportedBlock(rbi.getStorageInfo(), rbi.getBlock(),
+            rbi.getReportedState(), QUEUE_REASON_FUTURE_GENSTAMP);
       } else {
-        processAndHandleReportedBlock(rbi.getStorageInfo(),
-            rbi.getBlock(), rbi.getReportedState(), null);
+        isPreviousMessageProcessed =
+            processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(),
+                rbi.getReportedState(), null);
       }
     }
   }
@@ -4092,8 +4099,14 @@ public class BlockManager implements BlockStatsMXBean {
     processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
   }
-  
-  private void processAndHandleReportedBlock(
+
+  /**
+   * Process a reported block.
+   * @return true if the block is processed, or false if the block is queued
+   * to be processed later.
+   * @throws IOException
+   */
+  private boolean processAndHandleReportedBlock(
       DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
@@ -4107,7 +4120,7 @@ public class BlockManager implements BlockStatsMXBean {
         isGenStampInFuture(block)) {
       queueReportedBlock(storageInfo, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
-      return;
+      return false;
     }
 
     // find block by blockId
@@ -4118,7 +4131,7 @@ public class BlockManager implements BlockStatsMXBean {
       blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
           "belong to any file", block, node, block.getNumBytes());
       addToInvalidates(new Block(block), node);
-      return;
+      return true;
     }
 
     BlockUCState ucState = storedBlock.getBlockUCState();
@@ -4127,7 +4140,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     // Ignore replicas already scheduled to be removed from the DN
     if(invalidateBlocks.contains(node, block)) {
-      return;
+      return true;
     }
 
     BlockToMarkCorrupt c = checkReplicaCorrupt(
@@ -4145,14 +4158,14 @@ public class BlockManager implements BlockStatsMXBean {
       } else {
         markBlockAsCorrupt(c, storageInfo, node);
       }
-      return;
+      return true;
     }
 
     if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
       addStoredBlockUnderConstruction(
           new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
           storageInfo);
-      return;
+      return true;
     }
 
     // Add replica if appropriate. If the replica was previously corrupt
@@ -4162,6 +4175,7 @@ public class BlockManager implements BlockStatsMXBean {
             corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
       addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
     }
+    return true;
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java
new file mode 100644
index 0000000..f5899c0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests corruption of replicas in case of failover.
+ */
+public class TestCorruptionWithFailover {
+
+  @Test
+  public void testCorruptReplicaAfterFailover() throws Exception {
+    Configuration conf = new Configuration();
+    // Enable data to be written, to less replicas in case of pipeline failure.
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        MIN_REPLICATION, 2);
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3)
+        .build()) {
+      cluster.transitionToActive(0);
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem(0);
+      FSDataOutputStream out = dfs.create(new Path("/dir/file"));
+      // Write some data and flush.
+      for (int i = 0; i < 1024 * 1024; i++) {
+        out.write(i);
+      }
+      out.hsync();
+      // Stop one datanode, so as to trigger update pipeline.
+      MiniDFSCluster.DataNodeProperties dn = cluster.stopDataNode(0);
+      // Write some more data and close the file.
+      for (int i = 0; i < 1024 * 1024; i++) {
+        out.write(i);
+      }
+      out.close();
+      BlockManager bm0 = cluster.getNamesystem(0).getBlockManager();
+      BlockManager bm1 = cluster.getNamesystem(1).getBlockManager();
+      // Mark datanodes as stale, as are marked if a namenode went through a
+      // failover, to prevent replica deletion.
+      bm0.getDatanodeManager().markAllDatanodesStale();
+      bm1.getDatanodeManager().markAllDatanodesStale();
+      // Restart the datanode
+      cluster.restartDataNode(dn);
+      // The replica from the datanode will be having lesser genstamp, so
+      // would be marked as CORRUPT.
+      GenericTestUtils.waitFor(() -> bm0.getCorruptBlocks() == 1, 100, 10000);
+
+      // Perform failover to other namenode
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      cluster.waitActive(1);
+      // The corrupt count should be same as first namenode.
+      GenericTestUtils.waitFor(() -> bm1.getCorruptBlocks() == 1, 100, 10000);
+    }
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org