You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2014/12/16 17:32:17 UTC

hadoop git commit: HDFS-6425. Large postponedMisreplicatedBlocks has impact on blockReport latency. Contributed by Ming Ma. (cherry picked from commit b7923a356e9f111619375b94d12749d634069347)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 a1b8b9ca5 -> b70c0c295


HDFS-6425. Large postponedMisreplicatedBlocks has impact on blockReport latency. Contributed by Ming Ma.
(cherry picked from commit b7923a356e9f111619375b94d12749d634069347)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b70c0c29
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b70c0c29
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b70c0c29

Branch: refs/heads/branch-2
Commit: b70c0c295cab04073d7889b6d7c893c29ba5b7ba
Parents: a1b8b9c
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Dec 16 10:31:57 2014 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Tue Dec 16 10:31:57 2014 -0600

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 +
 .../server/blockmanagement/BlockManager.java    | 113 +++++++++++--------
 .../server/blockmanagement/DatanodeManager.java |  14 ++-
 .../src/main/resources/hdfs-default.xml         |   8 ++
 .../blockmanagement/BlockManagerTestUtil.java   |   8 ++
 .../hdfs/server/namenode/ha/TestDNFencing.java  |  19 +++-
 .../ha/TestDNFencingWithReplication.java        |   4 +
 8 files changed, 124 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b70c0c29/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 49188a8..8a2c115 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -338,6 +338,9 @@ Release 2.7.0 - UNRELEASED
     
     HDFS-7516. Fix findbugs warnings in hdfs-nfs project. (brandonli)
 
+    HDFS-6425. Large postponedMisreplicatedBlocks has impact on blockReport
+    latency. (Ming Ma via kihwal)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b70c0c29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 590ba2a..5305d2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -326,6 +326,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
   public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
 
+  // Number of blocks to rescan for each iteration of postponedMisreplicatedBlocks.
+  public static final String DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY = "dfs.namenode.blocks.per.postponedblocks.rescan";
+  public static final long DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT = 10000;
+
   // Replication monitoring related keys
   public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =
       "dfs.namenode.invalidate.work.pct.per.iteration";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b70c0c29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1ff9674..23ba10a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1050,21 +1050,6 @@ public class BlockManager {
 
     node.resetBlocks();
     invalidateBlocks.remove(node);
-    
-    // If the DN hasn't block-reported since the most recent
-    // failover, then we may have been holding up on processing
-    // over-replicated blocks because of it. But we can now
-    // process those blocks.
-    boolean stale = false;
-    for(DatanodeStorageInfo storage : node.getStorageInfos()) {
-      if (storage.areBlockContentsStale()) {
-        stale = true;
-        break;
-      }
-    }
-    if (stale) {
-      rescanPostponedMisreplicatedBlocks();
-    }
   }
 
   /** Remove the blocks associated to the given DatanodeStorageInfo. */
@@ -1821,17 +1806,7 @@ public class BlockManager {
         invalidatedBlocks = processReport(storageInfo, newReport);
       }
       
-      // Now that we have an up-to-date block report, we know that any
-      // deletions from a previous NN iteration have been accounted for.
-      boolean staleBefore = storageInfo.areBlockContentsStale();
       storageInfo.receivedBlockReport();
-      if (staleBefore && !storageInfo.areBlockContentsStale()) {
-        LOG.info("BLOCK* processReport: Received first block report from "
-            + storage + " after starting up or becoming active. Its block "
-            + "contents are no longer considered stale");
-        rescanPostponedMisreplicatedBlocks();
-      }
-      
     } finally {
       endTime = Time.now();
       namesystem.writeUnlock();
@@ -1860,31 +1835,74 @@ public class BlockManager {
   /**
    * Rescan the list of blocks which were previously postponed.
    */
-  private void rescanPostponedMisreplicatedBlocks() {
-    for (Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
-         it.hasNext();) {
-      Block b = it.next();
-      
-      BlockInfo bi = blocksMap.getStoredBlock(b);
-      if (bi == null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
-              "Postponed mis-replicated block " + b + " no longer found " +
-              "in block map.");
+  void rescanPostponedMisreplicatedBlocks() {
+    if (getPostponedMisreplicatedBlocksCount() == 0) {
+      return;
+    }
+    long startTimeRescanPostponedMisReplicatedBlocks = Time.now();
+    long startPostponedMisReplicatedBlocksCount =
+        getPostponedMisreplicatedBlocksCount();
+    namesystem.writeLock();
+    try {
+      // blocksPerRescan is the configured number of blocks per rescan.
+      // Randomly select blocksPerRescan consecutive blocks from the HashSet
+      // when the number of blocks remaining is larger than blocksPerRescan.
+      // The reason we don't always pick the first blocksPerRescan blocks is to
+      // handle the case if for some reason some datanodes remain in
+      // content stale state for a long time and only impact the first
+      // blocksPerRescan blocks.
+      int i = 0;
+      long startIndex = 0;
+      long blocksPerRescan =
+          datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
+      long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
+      if (base > 0) {
+        startIndex = DFSUtil.getRandom().nextLong() % (base+1);
+        if (startIndex < 0) {
+          startIndex += (base+1);
         }
-        it.remove();
-        postponedMisreplicatedBlocksCount.decrementAndGet();
-        continue;
       }
-      MisReplicationResult res = processMisReplicatedBlock(bi);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
-            "Re-scanned block " + b + ", result is " + res);
+      Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
+      for (int tmp = 0; tmp < startIndex; tmp++) {
+        it.next();
       }
-      if (res != MisReplicationResult.POSTPONE) {
-        it.remove();
-        postponedMisreplicatedBlocksCount.decrementAndGet();
+
+      for (;it.hasNext(); i++) {
+        Block b = it.next();
+        if (i >= blocksPerRescan) {
+          break;
+        }
+
+        BlockInfo bi = blocksMap.getStoredBlock(b);
+        if (bi == null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
+                "Postponed mis-replicated block " + b + " no longer found " +
+                "in block map.");
+          }
+          it.remove();
+          postponedMisreplicatedBlocksCount.decrementAndGet();
+          continue;
+        }
+        MisReplicationResult res = processMisReplicatedBlock(bi);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
+              "Re-scanned block " + b + ", result is " + res);
+        }
+        if (res != MisReplicationResult.POSTPONE) {
+          it.remove();
+          postponedMisreplicatedBlocksCount.decrementAndGet();
+        }
       }
+    } finally {
+      namesystem.writeUnlock();
+      long endPostponedMisReplicatedBlocksCount =
+          getPostponedMisreplicatedBlocksCount();
+      LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
+          (Time.now() - startTimeRescanPostponedMisReplicatedBlocks) +
+          " msecs. " + endPostponedMisReplicatedBlocksCount +
+          " blocks are left. " + (startPostponedMisReplicatedBlocksCount -
+          endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
     }
   }
   
@@ -3583,6 +3601,7 @@ public class BlockManager {
           if (namesystem.isPopulatingReplQueues()) {
             computeDatanodeWork();
             processPendingReplications();
+            rescanPostponedMisreplicatedBlocks();
           }
           Thread.sleep(replicationRecheckInterval);
         } catch (Throwable t) {
@@ -3651,6 +3670,8 @@ public class BlockManager {
     excessReplicateMap.clear();
     invalidateBlocks.clear();
     datanodeManager.clearPendingQueues();
+    postponedMisreplicatedBlocks.clear();
+    postponedMisreplicatedBlocksCount.set(0);
   };
   
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b70c0c29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 6438466..ba33db4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -133,7 +133,7 @@ public class DatanodeManager {
    * writing to stale datanodes, i.e., continue using stale nodes for writing.
    */
   private final float ratioUseStaleDataNodesForWrite;
-  
+
   /** The number of stale DataNodes */
   private volatile int numStaleNodes;
 
@@ -141,6 +141,11 @@ public class DatanodeManager {
   private volatile int numStaleStorages;
 
   /**
+   * Number of blocks to check for each postponedMisreplicatedBlocks iteration
+   */
+  private final long blocksPerPostponedMisreplicatedBlocksRescan;
+
+  /**
    * Whether or not this cluster has ever consisted of more than 1 rack,
    * according to the NetworkTopology.
    */
@@ -259,6 +264,9 @@ public class DatanodeManager {
     this.timeBetweenResendingCachingDirectivesMs = conf.getLong(
         DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
         DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT);
+    this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY,
+        DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
   }
 
   private static long getStaleIntervalFromConf(Configuration conf,
@@ -1132,6 +1140,10 @@ public class DatanodeManager {
             * ratioUseStaleDataNodesForWrite);
   }
 
+  public long getBlocksPerPostponedMisreplicatedBlocksRescan() {
+    return blocksPerPostponedMisreplicatedBlocksRescan;
+  }
+
   /**
    * @return The time interval used to mark DataNodes as stale.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b70c0c29/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 83840db..51d9ed6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2253,4 +2253,12 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.blocks.per.postponedblocks.rescan</name>
+  <value>10000</value>
+  <description>Number of blocks to rescan for each iteration of
+    postponedMisreplicatedBlocks.
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b70c0c29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 2755b29..fccd308 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -238,6 +238,14 @@ public class BlockManagerTestUtil {
     return dn.updateStorage(s);
   }
 
+  /**
+   * Call heartbeat check function of HeartbeatManager
+   * @param bm the BlockManager to manipulate
+   */
+  public static void rescanPostponedMisreplicatedBlocks(BlockManager bm) {
+    bm.rescanPostponedMisreplicatedBlocks();
+  }
+
   public static DatanodeDescriptor getLocalDatanodeDescriptor(
       boolean initializeStorage) {
     DatanodeDescriptor dn = new DatanodeDescriptor(DFSTestUtil.getLocalDatanodeID());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b70c0c29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
index 75d5b70..85864f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
@@ -165,7 +165,12 @@ public class TestDNFencing {
     
     banner("Metadata after nodes have all block-reported");
     doMetasave(nn2);
-    
+
+    // Force a rescan of postponedMisreplicatedBlocks.
+    BlockManager nn2BM = nn2.getNamesystem().getBlockManager();
+    BlockManagerTestUtil.checkHeartbeat(nn2BM);
+    BlockManagerTestUtil.rescanPostponedMisreplicatedBlocks(nn2BM);
+
     // The blocks should no longer be postponed.
     assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
     
@@ -251,7 +256,12 @@ public class TestDNFencing {
     
     banner("Metadata after nodes have all block-reported");
     doMetasave(nn2);
-    
+
+    // Force a rescan of postponedMisreplicatedBlocks.
+    BlockManager nn2BM = nn2.getNamesystem().getBlockManager();
+    BlockManagerTestUtil.checkHeartbeat(nn2BM);
+    BlockManagerTestUtil.rescanPostponedMisreplicatedBlocks(nn2BM);
+
     // The block should no longer be postponed.
     assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
     
@@ -347,6 +357,11 @@ public class TestDNFencing {
     banner("Metadata after nodes have all block-reported");
     doMetasave(nn2);
     
+    // Force a rescan of postponedMisreplicatedBlocks.
+    BlockManager nn2BM = nn2.getNamesystem().getBlockManager();
+    BlockManagerTestUtil.checkHeartbeat(nn2BM);
+    BlockManagerTestUtil.rescanPostponedMisreplicatedBlocks(nn2BM);
+
     // The block should no longer be postponed.
     assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b70c0c29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
index 93830c1..e7cba75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
@@ -109,6 +109,10 @@ public class TestDNFencingWithReplication {
     HAStressTestHarness harness = new HAStressTestHarness();
     harness.conf.setInt(
         DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    harness.conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
+    harness.conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
 
     final MiniDFSCluster cluster = harness.startCluster();
     try {