You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2020/12/15 05:58:35 UTC
[hadoop] 01/02: HDFS-15187. CORRUPT replica mismatch between
namenodes after failover. Contributed by Ayush Saxena.
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit a68ca4a292a430640ba34e46389fc7aaf785c071
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Mon Feb 24 20:38:04 2020 +0530
HDFS-15187. CORRUPT replica mismatch between namenodes after failover. Contributed by Ayush Saxena.
---
.../hdfs/server/blockmanagement/BlockManager.java | 32 ++++++---
.../TestCorruptionWithFailover.java | 81 ++++++++++++++++++++++
2 files changed, 104 insertions(+), 9 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 8cff375..c128666 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3162,6 +3162,7 @@ public class BlockManager implements BlockStatsMXBean {
private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis)
throws IOException {
+ boolean isPreviousMessageProcessed = true;
for (ReportedBlockInfo rbi : rbis) {
LOG.debug("Processing previouly queued message {}", rbi);
if (rbi.getReportedState() == null) {
@@ -3169,9 +3170,15 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo storageInfo = rbi.getStorageInfo();
removeStoredBlock(getStoredBlock(rbi.getBlock()),
storageInfo.getDatanodeDescriptor());
+ } else if (!isPreviousMessageProcessed) {
+ // if the previous IBR processing was skipped, skip processing all
+ // further IBR's so as to ensure same sequence of processing.
+ queueReportedBlock(rbi.getStorageInfo(), rbi.getBlock(),
+ rbi.getReportedState(), QUEUE_REASON_FUTURE_GENSTAMP);
} else {
- processAndHandleReportedBlock(rbi.getStorageInfo(),
- rbi.getBlock(), rbi.getReportedState(), null);
+ isPreviousMessageProcessed =
+ processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(),
+ rbi.getReportedState(), null);
}
}
}
@@ -4079,8 +4086,14 @@ public class BlockManager implements BlockStatsMXBean {
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
}
-
- private void processAndHandleReportedBlock(
+
+ /**
+ * Process a reported block.
+ * @return true if the block is processed, or false if the block is queued
+ * to be processed later.
+ * @throws IOException
+ */
+ private boolean processAndHandleReportedBlock(
DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
@@ -4094,7 +4107,7 @@ public class BlockManager implements BlockStatsMXBean {
isGenStampInFuture(block)) {
queueReportedBlock(storageInfo, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
- return;
+ return false;
}
// find block by blockId
@@ -4105,7 +4118,7 @@ public class BlockManager implements BlockStatsMXBean {
blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
"belong to any file", block, node, block.getNumBytes());
addToInvalidates(new Block(block), node);
- return;
+ return true;
}
BlockUCState ucState = storedBlock.getBlockUCState();
@@ -4114,7 +4127,7 @@ public class BlockManager implements BlockStatsMXBean {
// Ignore replicas already scheduled to be removed from the DN
if(invalidateBlocks.contains(node, block)) {
- return;
+ return true;
}
BlockToMarkCorrupt c = checkReplicaCorrupt(
@@ -4132,14 +4145,14 @@ public class BlockManager implements BlockStatsMXBean {
} else {
markBlockAsCorrupt(c, storageInfo, node);
}
- return;
+ return true;
}
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
addStoredBlockUnderConstruction(
new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
storageInfo);
- return;
+ return true;
}
// Add replica if appropriate. If the replica was previously corrupt
@@ -4149,6 +4162,7 @@ public class BlockManager implements BlockStatsMXBean {
corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
}
+ return true;
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java
new file mode 100644
index 0000000..f5899c0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptionWithFailover.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests corruption of replicas in case of failover.
+ */
+public class TestCorruptionWithFailover {
+
+ @Test
+ public void testCorruptReplicaAfterFailover() throws Exception {
+ Configuration conf = new Configuration();
+ // Enable data to be written, to less replicas in case of pipeline failure.
+ conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ MIN_REPLICATION, 2);
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3)
+ .build()) {
+ cluster.transitionToActive(0);
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem(0);
+ FSDataOutputStream out = dfs.create(new Path("/dir/file"));
+ // Write some data and flush.
+ for (int i = 0; i < 1024 * 1024; i++) {
+ out.write(i);
+ }
+ out.hsync();
+ // Stop one datanode, so as to trigger update pipeline.
+ MiniDFSCluster.DataNodeProperties dn = cluster.stopDataNode(0);
+ // Write some more data and close the file.
+ for (int i = 0; i < 1024 * 1024; i++) {
+ out.write(i);
+ }
+ out.close();
+ BlockManager bm0 = cluster.getNamesystem(0).getBlockManager();
+ BlockManager bm1 = cluster.getNamesystem(1).getBlockManager();
+ // Mark datanodes as stale, as are marked if a namenode went through a
+ // failover, to prevent replica deletion.
+ bm0.getDatanodeManager().markAllDatanodesStale();
+ bm1.getDatanodeManager().markAllDatanodesStale();
+ // Restart the datanode
+ cluster.restartDataNode(dn);
+ // The replica from the datanode will be having lesser genstamp, so
+ // would be marked as CORRUPT.
+ GenericTestUtils.waitFor(() -> bm0.getCorruptBlocks() == 1, 100, 10000);
+
+ // Perform failover to other namenode
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+ cluster.waitActive(1);
+ // The corrupt count should be same as first namenode.
+ GenericTestUtils.waitFor(() -> bm1.getCorruptBlocks() == 1, 100, 10000);
+ }
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org