You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2019/11/06 16:51:11 UTC

[hadoop] branch branch-3.2 updated: HDFS-14946. Erasure Coding: Block recovery failed during decommissioning. Contributed by Fei Hui.

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 5941a91  HDFS-14946. Erasure Coding: Block recovery failed during decommissioning. Contributed by Fei Hui.
5941a91 is described below

commit 5941a91f64882e9717a37c431f2632cd558b6558
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Nov 6 22:10:27 2019 +0530

    HDFS-14946. Erasure Coding: Block recovery failed during decommissioning. Contributed by Fei Hui.
---
 .../hdfs/server/blockmanagement/BlockManager.java  | 39 ++++++++--
 .../hadoop/hdfs/TestDecommissionWithStriped.java   | 88 ++++++++++++++++++++--
 2 files changed, 116 insertions(+), 11 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index a291707..41575cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2079,13 +2079,14 @@ public class BlockManager implements BlockStatsMXBean {
             numReplicas.decommissioning() -
             numReplicas.liveEnteringMaintenanceReplicas();
       }
-      byte[] indices = new byte[liveBlockIndices.size()];
-      for (int i = 0 ; i < liveBlockIndices.size(); i++) {
-        indices[i] = liveBlockIndices.get(i);
-      }
-      return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes,
+      final DatanodeDescriptor[] newSrcNodes =
+          new DatanodeDescriptor[srcNodes.length];
+      byte[] newIndices = new byte[liveBlockIndices.size()];
+      adjustSrcNodesAndIndices((BlockInfoStriped)block,
+          srcNodes, liveBlockIndices, newSrcNodes, newIndices);
+      return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes,
           containingNodes, liveReplicaNodes, additionalReplRequired,
-          priority, indices);
+          priority, newIndices);
     } else {
       return new ReplicationWork(block, bc, srcNodes,
           containingNodes, liveReplicaNodes, additionalReplRequired,
@@ -2093,6 +2094,32 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
+  /**
+   * Adjust srcNodes and indices which are used to reconstruction block.
+   * We should guarantee the indexes of first minRequiredSources nodes
+   + are different.
+   */
+  private void adjustSrcNodesAndIndices(BlockInfoStriped block,
+      DatanodeDescriptor[] srcNodes, List<Byte> indices,
+      DatanodeDescriptor[] newSrcNodes, byte[] newIndices) {
+    BitSet bitSet = new BitSet(block.getRealTotalBlockNum());
+    List<Integer> skipIndexList = new ArrayList<>();
+    for (int i = 0, j = 0; i < srcNodes.length; i++) {
+      if (!bitSet.get(indices.get(i))) {
+        bitSet.set(indices.get(i));
+        newSrcNodes[j] = srcNodes[i];
+        newIndices[j++] = indices.get(i);
+      } else {
+        skipIndexList.add(i);
+      }
+    }
+    for(int i = srcNodes.length - skipIndexList.size(), j = 0;
+        i < srcNodes.length; i++, j++) {
+      newSrcNodes[i] = srcNodes[skipIndexList.get(j)];
+      newIndices[i] = indices.get(skipIndexList.get(j));
+    }
+  }
+
   private boolean validateReconstructionWork(BlockReconstructionWork rw) {
     BlockInfo block = rw.getBlock();
     int priority = rw.getPriority();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
index b375321..5cbb84a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
@@ -436,14 +436,15 @@ public class TestDecommissionWithStriped {
     return new DFSClient(nn.getNameNodeAddress(), conf);
   }
 
-  private void writeStripedFile(DistributedFileSystem dfs, Path ecFile,
-      int writeBytes) throws IOException, Exception {
+  private byte[] writeStripedFile(DistributedFileSystem fs, Path ecFile,
+      int writeBytes) throws Exception {
     byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
-    DFSTestUtil.writeFile(dfs, ecFile, new String(bytes));
-    StripedFileTestUtil.waitBlockGroupsReported(dfs, ecFile.toString());
+    DFSTestUtil.writeFile(fs, ecFile, new String(bytes));
+    StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString());
 
-    StripedFileTestUtil.checkData(dfs, ecFile, writeBytes,
+    StripedFileTestUtil.checkData(fs, ecFile, writeBytes,
         new ArrayList<DatanodeInfo>(), null, blockGroupSize);
+    return bytes;
   }
 
   private void writeConfigFile(Path name, List<String> nodes)
@@ -894,4 +895,81 @@ public class TestDecommissionWithStriped {
     assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
     cleanupFile(dfs, ecFile);
   }
+
+  /**
+   * Test recovery for an ec block, its storage array contains these internal
+   * blocks which are {b0, b1, b2, b3, null, b5, b6, b7, b8, b0, b1, b2,
+   * b3}, array[0]{b0} in decommissioning, array[1-3]{b1, b2, b3} are
+   * in decommissioned. array[4] is null, array[5-12]{b[5-8],b[0-3]} are
+   * in live.
+   */
+  @Test (timeout = 120000)
+  public void testRecoveryWithDecommission() throws Exception {
+    final Path ecFile = new Path(ecDir, "testRecoveryWithDecommission");
+    int writeBytes = cellSize * dataBlocks;
+    byte[] originBytesArray = writeStripedFile(dfs, ecFile, writeBytes);
+    List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
+        .getAllBlocks();
+    LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
+    DatanodeInfo[] dnList = blk.getLocations();
+    BlockInfoStriped blockInfo =
+        (BlockInfoStriped)bm.getStoredBlock(
+            new Block(blk.getBlock().getBlockId()));
+
+    // Decommission datanode dn0 contains block b0
+    // Aim to add storageinfo of replicated block b0 to storages[9] of ec block
+    List<DatanodeInfo> decommissionedNodes = new ArrayList<>();
+    decommissionedNodes.add(dnList[0]);
+    decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED);
+
+    // Now storages of ec block are (b0{decommissioned}, b[1-8]{live},
+    // b0{live})
+    assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
+    assertEquals(1, bm.countNodes(blockInfo).decommissioned());
+
+    int decommissionNodesNum = 4;
+
+    // Decommission nodes contain blocks of b[0-3]
+    // dn0 has been decommissioned
+    for (int i = 1; i < decommissionNodesNum; i++) {
+      decommissionedNodes.add(dnList[i]);
+    }
+    decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED);
+
+    // Now storages of ec block are (b[0-3]{decommissioned}, b[4-8]{live},
+    // b0{live}, b[1-3]{live})
+    // There are 9 live and 4 decommissioned internal blocks
+    assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
+    assertEquals(4, bm.countNodes(blockInfo).decommissioned());
+
+    // There are no reconstruction tasks
+    assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
+        .getNumPendingNodes());
+    assertEquals(0, bm.getUnderReplicatedNotMissingBlocks());
+
+    // Set dn0 in decommissioning
+    // So that the block on dn0 can be used for reconstruction task
+    DatanodeDescriptor dn0 = bm.getDatanodeManager()
+        .getDatanode(dnList[0].getDatanodeUuid());
+    dn0.startDecommission();
+
+    // Stop the datanode contains b4
+    DataNode dn = cluster.getDataNode(
+        dnList[decommissionNodesNum].getIpcPort());
+    cluster.stopDataNode(dnList[decommissionNodesNum].getXferAddr());
+    cluster.setDataNodeDead(dn.getDatanodeId());
+
+    // Now storages of ec block are (b[0]{decommissioning},
+    // b[1-3]{decommissioned}, null, b[5-8]{live}, b0{live}, b[1-3]{live})
+    // There are 8 live and 1 decommissioning internal blocks
+    // Wait for reconstruction EC block.
+    GenericTestUtils.waitFor(
+        () -> bm.countNodes(blockInfo).liveReplicas() == 9,
+        100, 10000);
+
+    byte[] readBytesArray = new byte[writeBytes];
+    StripedFileTestUtil.verifyPread(dfs, ecFile, writeBytes,
+        originBytesArray, readBytesArray, ecPolicy);
+    cleanupFile(dfs, ecFile);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org