You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/05/09 05:46:48 UTC

[05/50] [abbrv] hadoop git commit: HDFS-11609. Some blocks can be permanently lost if nodes are decommissioned while dead. Contributed by Kihwal Lee.

HDFS-11609. Some blocks can be permanently lost if nodes are decommissioned while dead. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/07b98e78
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/07b98e78
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/07b98e78

Branch: refs/heads/HDFS-7240
Commit: 07b98e7830c2214340cb7f434df674057e89df94
Parents: 30fc580
Author: Kihwal Lee <ki...@apache.org>
Authored: Mon May 1 14:19:02 2017 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Mon May 1 14:19:02 2017 -0500

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    |  30 ++++-
 .../blockmanagement/LowRedundancyBlocks.java    |   6 +-
 .../namenode/TestDecommissioningStatus.java     | 113 ++++++++++++++++++-
 3 files changed, 139 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/07b98e78/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7309846..e63930a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2031,7 +2031,8 @@ public class BlockManager implements BlockStatsMXBean {
    *
    * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
    * since the former do not have write traffic and hence are less busy.
-   * We do not use already decommissioned nodes as a source.
+   * We do not use already decommissioned nodes as a source, unless there is
+   * no other choice.
    * Otherwise we randomly choose nodes among those that did not reach their
    * replication limits. However, if the recovery work is of the highest
    * priority and all nodes have reached their replication limits, we will
@@ -2067,6 +2068,7 @@ public class BlockManager implements BlockStatsMXBean {
     List<DatanodeDescriptor> srcNodes = new ArrayList<>();
     liveBlockIndices.clear();
     final boolean isStriped = block.isStriped();
+    DatanodeDescriptor decommissionedSrc = null;
 
     BitSet bitSet = isStriped ?
         new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null;
@@ -2085,13 +2087,24 @@ public class BlockManager implements BlockStatsMXBean {
         continue;
       }
 
-      // never use already decommissioned nodes, maintenance node not
-      // suitable for read or unknown state replicas.
-      if (state == null || state == StoredReplicaState.DECOMMISSIONED
+      // Never use maintenance node not suitable for read
+      // or unknown state replicas.
+      if (state == null
           || state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) {
         continue;
       }
 
+      // Save the live decommissioned replica in case we need it. Such replicas
+      // are normally not used for replication, but if nothing else is
+      // available, one can be selected as a source.
+      if (state == StoredReplicaState.DECOMMISSIONED) {
+        if (decommissionedSrc == null ||
+            ThreadLocalRandom.current().nextBoolean()) {
+          decommissionedSrc = node;
+        }
+        continue;
+      }
+
       if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
           && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
           && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
@@ -2123,6 +2136,13 @@ public class BlockManager implements BlockStatsMXBean {
         srcNodes.set(0, node);
       }
     }
+
+    // Pick a live decommissioned replica, if nothing else is available.
+    if (!isStriped && nodesContainingLiveReplicas.isEmpty() &&
+        srcNodes.isEmpty() && decommissionedSrc != null) {
+      srcNodes.add(decommissionedSrc);
+    }
+
     return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);
   }
 
@@ -3036,7 +3056,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     int curReplicaDelta;
     if (result == AddBlockResult.ADDED) {
-      curReplicaDelta = 1;
+      curReplicaDelta = (node.isDecommissioned()) ? 0 : 1;
       if (logEveryBlock) {
         blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})",
             node, storedBlock, storedBlock.getNumBytes());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/07b98e78/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
index 3a26f4a..1a38480 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
@@ -346,9 +346,9 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
         " curPri  " + curPri +
         " oldPri  " + oldPri);
     }
-    if(oldPri != curPri) {
-      remove(block, oldPri);
-    }
+    // oldPri is mostly correct, but not always. If not found with oldPri,
+    // other levels will be searched until the block is found & removed.
+    remove(block, oldPri);
     if(priorityQueues.get(curPri).add(block)) {
       NameNode.blockStateChangeLog.debug(
           "BLOCK* NameSystem.LowRedundancyBlock.update: {} has only {} "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/07b98e78/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
index 8bdaa74..3cf025c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@@ -72,6 +73,7 @@ public class TestDecommissioningStatus {
   private static FileSystem fileSys;
   private static HostsFileWriter hostsFileWriter;
   private static Configuration conf;
+  private Logger LOG;
 
   final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
   
@@ -89,8 +91,7 @@ public class TestDecommissioningStatus {
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(
         DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 4);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
-        1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
     conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
 
@@ -100,6 +101,7 @@ public class TestDecommissioningStatus {
     cluster.getNamesystem().getBlockManager().getDatanodeManager()
         .setHeartbeatExpireInterval(3000);
     Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
+    LOG = Logger.getLogger(TestDecommissioningStatus.class);
   }
 
   @After
@@ -366,4 +368,111 @@ public class TestDecommissioningStatus {
     hostsFileWriter.initExcludeHost("");
     dm.refreshNodes(conf);
   }
+
+  @Test(timeout=120000)
+  public void testDecommissionLosingData() throws Exception {
+    ArrayList<String> nodes = new ArrayList<String>(2);
+    FSNamesystem fsn = cluster.getNamesystem();
+    BlockManager bm = fsn.getBlockManager();
+    DatanodeManager dm = bm.getDatanodeManager();
+    Path file1 = new Path("decommissionLosingData.dat");
+    DFSTestUtil.createFile(fileSys, file1, fileSize, fileSize, blockSize,
+        (short)2, seed);
+    Thread.sleep(1000);
+
+    // Shutdown dn1
+    LOG.info("Shutdown dn1");
+    DatanodeID dnID = cluster.getDataNodes().get(1).getDatanodeId();
+    String dnName = dnID.getXferAddr();
+    DatanodeDescriptor dnDescriptor1 = dm.getDatanode(dnID);
+    nodes.add(dnName);
+    DataNodeProperties stoppedDN1 = cluster.stopDataNode(1);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    // Shutdown dn0
+    LOG.info("Shutdown dn0");
+    dnID = cluster.getDataNodes().get(0).getDatanodeId();
+    dnName = dnID.getXferAddr();
+    DatanodeDescriptor dnDescriptor0 = dm.getDatanode(dnID);
+    nodes.add(dnName);
+    DataNodeProperties stoppedDN0 = cluster.stopDataNode(0);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    // Decommission the nodes.
+    LOG.info("Decommissioning nodes");
+    hostsFileWriter.initExcludeHosts(nodes);
+    dm.refreshNodes(conf);
+    BlockManagerTestUtil.recheckDecommissionState(dm);
+    assertTrue(dnDescriptor0.isDecommissioned());
+    assertTrue(dnDescriptor1.isDecommissioned());
+
+    // All nodes are dead and decommed. Blocks should be missing.
+    long  missingBlocks = bm.getMissingBlocksCount();
+    long underreplicated = bm.getUnderReplicatedBlocksCount();
+    assertTrue(missingBlocks > 0);
+    assertTrue(underreplicated > 0);
+
+    // Bring back dn0
+    LOG.info("Bring back dn0");
+    cluster.restartDataNode(stoppedDN0, true);
+    do {
+      dnID = cluster.getDataNodes().get(0).getDatanodeId();
+    } while (dnID == null);
+    dnDescriptor0 = dm.getDatanode(dnID);
+    // Wait until it sends a block report.
+    while (dnDescriptor0.numBlocks() == 0) {
+      Thread.sleep(100);
+    }
+
+    // Bring back dn1
+    LOG.info("Bring back dn1");
+    cluster.restartDataNode(stoppedDN1, true);
+    do {
+      dnID = cluster.getDataNodes().get(1).getDatanodeId();
+    } while (dnID == null);
+    dnDescriptor1 = dm.getDatanode(dnID);
+    // Wait until it sends a block report.
+    while (dnDescriptor1.numBlocks() == 0) {
+      Thread.sleep(100);
+    }
+
+    // Blocks should be still be under-replicated
+    Thread.sleep(2000);  // Let replication monitor run
+    assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount());
+
+    // Start up a node.
+    LOG.info("Starting two more nodes");
+    cluster.startDataNodes(conf, 2, true, null, null);
+    cluster.waitActive();
+    // Replication should fix it.
+    int count = 0;
+    while((bm.getUnderReplicatedBlocksCount() > 0 ||
+        bm.getPendingReconstructionBlocksCount() > 0) &&
+        count++ < 10) {
+      Thread.sleep(1000);
+    }
+
+    assertEquals(0, bm.getUnderReplicatedBlocksCount());
+    assertEquals(0, bm.getPendingReconstructionBlocksCount());
+    assertEquals(0, bm.getMissingBlocksCount());
+
+    // Shutdown the extra nodes.
+    dnID = cluster.getDataNodes().get(3).getDatanodeId();
+    cluster.stopDataNode(3);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    dnID = cluster.getDataNodes().get(2).getDatanodeId();
+    cluster.stopDataNode(2);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    // Call refreshNodes on FSNamesystem with empty exclude file to remove the
+    // datanode from decommissioning list and make it available again.
+    hostsFileWriter.initExcludeHost("");
+    dm.refreshNodes(conf);
+    fileSys.delete(file1, false);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org