You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2019/10/20 01:19:14 UTC

[hadoop] branch branch-3.1 updated: HDFS-14847. Erasure Coding: Blocks are over-replicated while EC decommissioning. Contributed by Fei Hui.

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c5b4a2d  HDFS-14847. Erasure Coding: Blocks are over-replicated while EC decommissioning. Contributed by Fei Hui.
c5b4a2d is described below

commit c5b4a2d11563265f0acdb4581126313c4d682094
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Sun Oct 20 06:10:59 2019 +0530

    HDFS-14847. Erasure Coding: Blocks are over-replicated while EC decommissioning. Contributed by Fei Hui.
---
 .../server/blockmanagement/BlockInfoStriped.java   |   4 +-
 .../blockmanagement/DatanodeAdminManager.java      |   5 +
 .../server/blockmanagement/DatanodeDescriptor.java |   7 +-
 .../server/blockmanagement/ErasureCodingWork.java  |  16 ++-
 .../server/blockmanagement/ProvidedStorageMap.java |   3 +-
 .../hadoop/hdfs/TestDecommissionWithStriped.java   | 124 +++++++++++++++++++++
 6 files changed, 153 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 790cd77..6256dd0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -154,7 +155,8 @@ public class BlockInfoStriped extends BlockInfo {
     return -1;
   }
 
-  byte getStorageBlockIndex(DatanodeStorageInfo storage) {
+  @VisibleForTesting
+  public byte getStorageBlockIndex(DatanodeStorageInfo storage) {
     int i = this.findStorageInfo(storage);
     return i == -1 ? -1 : indices[i];
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
index f30066a..ce92151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
@@ -437,6 +437,11 @@ public class DatanodeAdminManager {
     return monitor.numNodesChecked;
   }
 
+  @VisibleForTesting
+  public Queue<DatanodeDescriptor> getPendingNodes() {
+    return pendingNodes;
+  }
+
   /**
    * Checks to see if datanodes have finished DECOMMISSION_INPROGRESS or
    * ENTERING_MAINTENANCE state.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index a4e43f2..4edf76b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -643,7 +643,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /**
    * Store block replication work.
    */
-  void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
+  @VisibleForTesting
+  public void addBlockToBeReplicated(Block block,
+      DatanodeStorageInfo[] targets) {
     assert(block != null && targets != null && targets.length > 0);
     replicateBlocks.offer(new BlockTargetPair(block, targets));
   }
@@ -701,7 +703,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return erasurecodeBlocks.size();
   }
 
-  int getNumberOfReplicateBlocks() {
+  @VisibleForTesting
+  public int getNumberOfReplicateBlocks() {
     return replicateBlocks.size();
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
index 147f8cf..dcf1152 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
@@ -164,11 +164,23 @@ class ErasureCodingWork extends BlockReconstructionWork {
   }
 
   private List<Integer> findLeavingServiceSources() {
+    // Mark the block in normal node.
+    BlockInfoStriped block = (BlockInfoStriped)getBlock();
+    BitSet bitSet = new BitSet(block.getRealTotalBlockNum());
+    for (int i = 0; i < getSrcNodes().length; i++) {
+      if (getSrcNodes()[i].isInService()) {
+        bitSet.set(liveBlockIndicies[i]);
+      }
+    }
+    // If the block is on the node which is decommissioning or
+    // entering_maintenance, and it doesn't exist on other normal nodes,
+    // we just add the node into source list.
     List<Integer> srcIndices = new ArrayList<>();
     for (int i = 0; i < getSrcNodes().length; i++) {
-      if (getSrcNodes()[i].isDecommissionInProgress() ||
+      if ((getSrcNodes()[i].isDecommissionInProgress() ||
           (getSrcNodes()[i].isEnteringMaintenance() &&
-          getSrcNodes()[i].isAlive())) {
+          getSrcNodes()[i].isAlive())) &&
+          !bitSet.get(liveBlockIndicies[i])) {
         srcIndices.add(i);
       }
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
index 6303775..ded7ffb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
@@ -396,7 +396,8 @@ public class ProvidedStorageMap {
     }
 
     @Override
-    void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
+    public void addBlockToBeReplicated(Block block,
+        DatanodeStorageInfo[] targets) {
       // pick a random datanode, delegate to it
       DatanodeDescriptor node = chooseRandom(targets);
       if (node != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
index 7bd85b4..64dd17c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
@@ -27,9 +27,11 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.BitSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -43,14 +45,19 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -585,4 +592,121 @@ public class TestDecommissionWithStriped {
     }
     return null;
   }
+
+  /**
+   * Simulate that There are 2 nodes(dn0,dn1) in decommission. Firstly dn0
+   * replicates in success, dn1 replicates in failure. Decommissions go on.
+   */
+  @Test (timeout = 120000)
+  public void testDecommissionWithFailedReplicating() throws Exception {
+
+    // Write ec file.
+    Path ecFile = new Path(ecDir, "firstReplicationFailedFile");
+    int writeBytes = cellSize * 6;
+    writeStripedFile(dfs, ecFile, writeBytes);
+
+    // Get 2 nodes of ec block and set them in decommission.
+    // The 2 nodes are not in pendingNodes of DatanodeAdminManager.
+    List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
+        .getAllBlocks();
+    LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
+    DatanodeInfo[] dnList = blk.getLocations();
+    DatanodeDescriptor dn0 = bm.getDatanodeManager()
+        .getDatanode(dnList[0].getDatanodeUuid());
+    dn0.startDecommission();
+    DatanodeDescriptor dn1 = bm.getDatanodeManager()
+        .getDatanode(dnList[1].getDatanodeUuid());
+    dn1.startDecommission();
+
+    assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
+        .getNumPendingNodes());
+
+    // Replicate dn0 block to another dn
+    // Simulate that dn0 replicates in success, dn1 replicates in failure.
+    final byte blockIndex = blk.getBlockIndices()[0];
+    final Block targetBlk = new Block(blk.getBlock().getBlockId() + blockIndex,
+        cellSize, blk.getBlock().getGenerationStamp());
+    DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk);
+    DatanodeDescriptor target = bm.getDatanodeManager()
+        .getDatanode(extraDn.getDatanodeUuid());
+    dn0.addBlockToBeReplicated(targetBlk,
+        new DatanodeStorageInfo[] {target.getStorageInfos()[0]});
+
+    // dn0 replicates in success
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return dn0.getNumberOfReplicateBlocks() == 0;
+      }
+    }, 100, 60000);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        Iterator<DatanodeStorageInfo> it =
+            bm.getStoredBlock(targetBlk).getStorageInfos();
+        while(it.hasNext()) {
+          if (it.next().getDatanodeDescriptor().equals(target)) {
+            return true;
+          }
+        }
+        return false;
+      }
+    }, 100, 60000);
+
+    // There are 8 live replicas
+    BlockInfoStriped blockInfo =
+        (BlockInfoStriped)bm.getStoredBlock(
+            new Block(blk.getBlock().getBlockId()));
+    assertEquals(8, bm.countNodes(blockInfo).liveReplicas());
+
+    // Add the 2 nodes to pendingNodes of DatanodeAdminManager
+    bm.getDatanodeManager().getDatanodeAdminManager()
+        .getPendingNodes().add(dn0);
+    bm.getDatanodeManager().getDatanodeAdminManager()
+        .getPendingNodes().add(dn1);
+
+    waitNodeState(dn0, AdminStates.DECOMMISSIONED);
+    waitNodeState(dn1, AdminStates.DECOMMISSIONED);
+
+    // There are 9 live replicas
+    assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
+
+    // After dn0 & dn1 decommissioned, all internal Blocks(0~8) are there
+    Iterator<DatanodeStorageInfo> it = blockInfo.getStorageInfos();
+    BitSet indexBitSet = new BitSet(9);
+    while(it.hasNext()) {
+      DatanodeStorageInfo storageInfo = it.next();
+      if(storageInfo.getDatanodeDescriptor().equals(dn0)
+          || storageInfo.getDatanodeDescriptor().equals(dn1)) {
+        // Skip decommissioned nodes
+        continue;
+      }
+      byte index = blockInfo.getStorageBlockIndex(storageInfo);
+      indexBitSet.set(index);
+    }
+    for (int i = 0; i < 9; ++i) {
+      assertEquals(true, indexBitSet.get(i));
+    }
+  }
+
+  /**
+   * Get a Datanode which does not contain the block.
+   */
+  private DatanodeInfo getDatanodeOutOfTheBlock(LocatedStripedBlock blk)
+      throws Exception {
+    DatanodeInfo[] allDnInfos = client.datanodeReport(DatanodeReportType.LIVE);
+    DatanodeInfo[] blkDnInos= blk.getLocations();
+    for (DatanodeInfo dnInfo : allDnInfos) {
+      boolean in = false;
+      for (DatanodeInfo blkDnInfo : blkDnInos) {
+        if (blkDnInfo.equals(dnInfo)) {
+          in = true;
+        }
+      }
+      if(!in) {
+        return dnInfo;
+      }
+    }
+    return null;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org