You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2019/11/06 16:51:11 UTC
[hadoop] branch branch-3.2 updated: HDFS-14946. Erasure Coding:
Block recovery failed during decommissioning. Contributed by Fei Hui.
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 5941a91 HDFS-14946. Erasure Coding: Block recovery failed during decommissioning. Contributed by Fei Hui.
5941a91 is described below
commit 5941a91f64882e9717a37c431f2632cd558b6558
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Nov 6 22:10:27 2019 +0530
HDFS-14946. Erasure Coding: Block recovery failed during decommissioning. Contributed by Fei Hui.
---
.../hdfs/server/blockmanagement/BlockManager.java | 39 ++++++++--
.../hadoop/hdfs/TestDecommissionWithStriped.java | 88 ++++++++++++++++++++--
2 files changed, 116 insertions(+), 11 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index a291707..41575cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2079,13 +2079,14 @@ public class BlockManager implements BlockStatsMXBean {
numReplicas.decommissioning() -
numReplicas.liveEnteringMaintenanceReplicas();
}
- byte[] indices = new byte[liveBlockIndices.size()];
- for (int i = 0 ; i < liveBlockIndices.size(); i++) {
- indices[i] = liveBlockIndices.get(i);
- }
- return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes,
+ final DatanodeDescriptor[] newSrcNodes =
+ new DatanodeDescriptor[srcNodes.length];
+ byte[] newIndices = new byte[liveBlockIndices.size()];
+ adjustSrcNodesAndIndices((BlockInfoStriped)block,
+ srcNodes, liveBlockIndices, newSrcNodes, newIndices);
+ return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
- priority, indices);
+ priority, newIndices);
} else {
return new ReplicationWork(block, bc, srcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
@@ -2093,6 +2094,32 @@ public class BlockManager implements BlockStatsMXBean {
}
}
+ /**
+ * Adjust srcNodes and indices which are used to reconstruction block.
+ * We should guarantee the indexes of first minRequiredSources nodes
+ + are different.
+ */
+ private void adjustSrcNodesAndIndices(BlockInfoStriped block,
+ DatanodeDescriptor[] srcNodes, List<Byte> indices,
+ DatanodeDescriptor[] newSrcNodes, byte[] newIndices) {
+ BitSet bitSet = new BitSet(block.getRealTotalBlockNum());
+ List<Integer> skipIndexList = new ArrayList<>();
+ for (int i = 0, j = 0; i < srcNodes.length; i++) {
+ if (!bitSet.get(indices.get(i))) {
+ bitSet.set(indices.get(i));
+ newSrcNodes[j] = srcNodes[i];
+ newIndices[j++] = indices.get(i);
+ } else {
+ skipIndexList.add(i);
+ }
+ }
+ for(int i = srcNodes.length - skipIndexList.size(), j = 0;
+ i < srcNodes.length; i++, j++) {
+ newSrcNodes[i] = srcNodes[skipIndexList.get(j)];
+ newIndices[i] = indices.get(skipIndexList.get(j));
+ }
+ }
+
private boolean validateReconstructionWork(BlockReconstructionWork rw) {
BlockInfo block = rw.getBlock();
int priority = rw.getPriority();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
index b375321..5cbb84a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
@@ -436,14 +436,15 @@ public class TestDecommissionWithStriped {
return new DFSClient(nn.getNameNodeAddress(), conf);
}
- private void writeStripedFile(DistributedFileSystem dfs, Path ecFile,
- int writeBytes) throws IOException, Exception {
+ private byte[] writeStripedFile(DistributedFileSystem fs, Path ecFile,
+ int writeBytes) throws Exception {
byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
- DFSTestUtil.writeFile(dfs, ecFile, new String(bytes));
- StripedFileTestUtil.waitBlockGroupsReported(dfs, ecFile.toString());
+ DFSTestUtil.writeFile(fs, ecFile, new String(bytes));
+ StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString());
- StripedFileTestUtil.checkData(dfs, ecFile, writeBytes,
+ StripedFileTestUtil.checkData(fs, ecFile, writeBytes,
new ArrayList<DatanodeInfo>(), null, blockGroupSize);
+ return bytes;
}
private void writeConfigFile(Path name, List<String> nodes)
@@ -894,4 +895,81 @@ public class TestDecommissionWithStriped {
assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
cleanupFile(dfs, ecFile);
}
+
+ /**
+ * Test recovery for an ec block, its storage array contains these internal
+ * blocks which are {b0, b1, b2, b3, null, b5, b6, b7, b8, b0, b1, b2,
+ * b3}, array[0]{b0} in decommissioning, array[1-3]{b1, b2, b3} are
+ * in decommissioned. array[4] is null, array[5-12]{b[5-8],b[0-3]} are
+ * in live.
+ */
+ @Test (timeout = 120000)
+ public void testRecoveryWithDecommission() throws Exception {
+ final Path ecFile = new Path(ecDir, "testRecoveryWithDecommission");
+ int writeBytes = cellSize * dataBlocks;
+ byte[] originBytesArray = writeStripedFile(dfs, ecFile, writeBytes);
+ List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
+ .getAllBlocks();
+ LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
+ DatanodeInfo[] dnList = blk.getLocations();
+ BlockInfoStriped blockInfo =
+ (BlockInfoStriped)bm.getStoredBlock(
+ new Block(blk.getBlock().getBlockId()));
+
+ // Decommission datanode dn0 contains block b0
+ // Aim to add storageinfo of replicated block b0 to storages[9] of ec block
+ List<DatanodeInfo> decommissionedNodes = new ArrayList<>();
+ decommissionedNodes.add(dnList[0]);
+ decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED);
+
+ // Now storages of ec block are (b0{decommissioned}, b[1-8]{live},
+ // b0{live})
+ assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
+ assertEquals(1, bm.countNodes(blockInfo).decommissioned());
+
+ int decommissionNodesNum = 4;
+
+ // Decommission nodes contain blocks of b[0-3]
+ // dn0 has been decommissioned
+ for (int i = 1; i < decommissionNodesNum; i++) {
+ decommissionedNodes.add(dnList[i]);
+ }
+ decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED);
+
+ // Now storages of ec block are (b[0-3]{decommissioned}, b[4-8]{live},
+ // b0{live}, b[1-3]{live})
+ // There are 9 live and 4 decommissioned internal blocks
+ assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
+ assertEquals(4, bm.countNodes(blockInfo).decommissioned());
+
+ // There are no reconstruction tasks
+ assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
+ .getNumPendingNodes());
+ assertEquals(0, bm.getUnderReplicatedNotMissingBlocks());
+
+ // Set dn0 in decommissioning
+ // So that the block on dn0 can be used for reconstruction task
+ DatanodeDescriptor dn0 = bm.getDatanodeManager()
+ .getDatanode(dnList[0].getDatanodeUuid());
+ dn0.startDecommission();
+
+ // Stop the datanode contains b4
+ DataNode dn = cluster.getDataNode(
+ dnList[decommissionNodesNum].getIpcPort());
+ cluster.stopDataNode(dnList[decommissionNodesNum].getXferAddr());
+ cluster.setDataNodeDead(dn.getDatanodeId());
+
+ // Now storages of ec block are (b[0]{decommissioning},
+ // b[1-3]{decommissioned}, null, b[5-8]{live}, b0{live}, b[1-3]{live})
+ // There are 8 live and 1 decommissioning internal blocks
+ // Wait for reconstruction EC block.
+ GenericTestUtils.waitFor(
+ () -> bm.countNodes(blockInfo).liveReplicas() == 9,
+ 100, 10000);
+
+ byte[] readBytesArray = new byte[writeBytes];
+ StripedFileTestUtil.verifyPread(dfs, ecFile, writeBytes,
+ originBytesArray, readBytesArray, ecPolicy);
+ cleanupFile(dfs, ecFile);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org