You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2016/12/01 18:12:06 UTC

hadoop git commit: HDFS-8674. Improve performance of postponed block scans. Contributed by Daryn Sharp.

Repository: hadoop
Updated Branches:
  refs/heads/trunk e0fa49234 -> 96c574927


HDFS-8674. Improve performance of postponed block scans. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/96c57492
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/96c57492
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/96c57492

Branch: refs/heads/trunk
Commit: 96c574927a600d15fab919df1fdc9e07887af6c5
Parents: e0fa492
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Dec 1 12:11:27 2016 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Dec 1 12:11:27 2016 -0600

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    | 79 ++++++--------------
 1 file changed, 24 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/96c57492/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1b744e7..e60703b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -30,6 +30,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -43,8 +44,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import javax.management.ObjectName;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -101,7 +100,6 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.util.FoldedTreeSet;
-import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.server.namenode.CacheManager;
 
@@ -184,7 +182,6 @@ public class BlockManager implements BlockStatsMXBean {
   /** flag indicating whether replication queues have been initialized */
   private boolean initializedReplQueues;
 
-  private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
   private final long startupDelayBlockDeletionInMs;
   private final BlockReportLeaseManager blockReportLeaseManager;
   private ObjectName mxBeanName;
@@ -219,7 +216,7 @@ public class BlockManager implements BlockStatsMXBean {
   }
   /** Used by metrics */
   public long getPostponedMisreplicatedBlocksCount() {
-    return postponedMisreplicatedBlocksCount.get();
+    return postponedMisreplicatedBlocks.size();
   }
   /** Used by metrics */
   public int getPendingDataNodeMessageCount() {
@@ -275,8 +272,10 @@ public class BlockManager implements BlockStatsMXBean {
    * notified of all block deletions that might have been pending
    * when the failover happened.
    */
-  private final LightWeightHashSet<Block> postponedMisreplicatedBlocks =
-      new LightWeightHashSet<>();
+  private final Set<Block> postponedMisreplicatedBlocks =
+      new LinkedHashSet<Block>();
+  private final int blocksPerPostpondedRescan;
+  private final ArrayList<Block> rescannedMisreplicatedBlocks;
 
   /**
    * Maps a StorageID to the set of blocks that are "extra" for this
@@ -378,7 +377,10 @@ public class BlockManager implements BlockStatsMXBean {
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
     this.blockIdManager = new BlockIdManager(this);
-
+    blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,
+        datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan());
+    rescannedMisreplicatedBlocks =
+        new ArrayList<Block>(blocksPerPostpondedRescan);
     startupDelayBlockDeletionInMs = conf.getLong(
         DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
         DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
@@ -1613,9 +1615,7 @@ public class BlockManager implements BlockStatsMXBean {
 
 
   private void postponeBlock(Block blk) {
-    if (postponedMisreplicatedBlocks.add(blk)) {
-      postponedMisreplicatedBlocksCount.incrementAndGet();
-    }
+    postponedMisreplicatedBlocks.add(blk);
   }
   
   
@@ -2375,39 +2375,14 @@ public class BlockManager implements BlockStatsMXBean {
     if (getPostponedMisreplicatedBlocksCount() == 0) {
       return;
     }
-    long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
     namesystem.writeLock();
-    long startPostponedMisReplicatedBlocksCount =
-        getPostponedMisreplicatedBlocksCount();
+    long startTime = Time.monotonicNow();
+    long startSize = postponedMisreplicatedBlocks.size();
     try {
-      // blocksPerRescan is the configured number of blocks per rescan.
-      // Randomly select blocksPerRescan consecutive blocks from the HashSet
-      // when the number of blocks remaining is larger than blocksPerRescan.
-      // The reason we don't always pick the first blocksPerRescan blocks is to
-      // handle the case if for some reason some datanodes remain in
-      // content stale state for a long time and only impact the first
-      // blocksPerRescan blocks.
-      int i = 0;
-      long startIndex = 0;
-      long blocksPerRescan =
-          datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
-      long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
-      if (base > 0) {
-        startIndex = ThreadLocalRandom.current().nextLong() % (base+1);
-        if (startIndex < 0) {
-          startIndex += (base+1);
-        }
-      }
       Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
-      for (int tmp = 0; tmp < startIndex; tmp++) {
-        it.next();
-      }
-
-      for (;it.hasNext(); i++) {
+      for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) {
         Block b = it.next();
-        if (i >= blocksPerRescan) {
-          break;
-        }
+        it.remove();
 
         BlockInfo bi = getStoredBlock(b);
         if (bi == null) {
@@ -2416,8 +2391,6 @@ public class BlockManager implements BlockStatsMXBean {
                 "Postponed mis-replicated block " + b + " no longer found " +
                 "in block map.");
           }
-          it.remove();
-          postponedMisreplicatedBlocksCount.decrementAndGet();
           continue;
         }
         MisReplicationResult res = processMisReplicatedBlock(bi);
@@ -2425,20 +2398,19 @@ public class BlockManager implements BlockStatsMXBean {
           LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
               "Re-scanned block " + b + ", result is " + res);
         }
-        if (res != MisReplicationResult.POSTPONE) {
-          it.remove();
-          postponedMisreplicatedBlocksCount.decrementAndGet();
+        if (res == MisReplicationResult.POSTPONE) {
+          rescannedMisreplicatedBlocks.add(b);
         }
       }
     } finally {
-      long endPostponedMisReplicatedBlocksCount =
-          getPostponedMisreplicatedBlocksCount();
+      postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
+      rescannedMisreplicatedBlocks.clear();
+      long endSize = postponedMisreplicatedBlocks.size();
       namesystem.writeUnlock();
       LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
-          (Time.monotonicNow() - startTimeRescanPostponedMisReplicatedBlocks) +
-          " msecs. " + endPostponedMisReplicatedBlocksCount +
-          " blocks are left. " + (startPostponedMisReplicatedBlocksCount -
-          endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
+          (Time.monotonicNow() - startTime) + " msecs. " +
+          endSize + " blocks are left. " +
+          (startSize - endSize) + " blocks were removed.");
     }
   }
   
@@ -4048,9 +4020,7 @@ public class BlockManager implements BlockStatsMXBean {
     // Remove the block from pendingReconstruction and neededReconstruction
     pendingReconstruction.remove(block);
     neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
-    if (postponedMisreplicatedBlocks.remove(block)) {
-      postponedMisreplicatedBlocksCount.decrementAndGet();
-    }
+    postponedMisreplicatedBlocks.remove(block);
   }
 
   public BlockInfo getStoredBlock(Block block) {
@@ -4464,7 +4434,6 @@ public class BlockManager implements BlockStatsMXBean {
     invalidateBlocks.clear();
     datanodeManager.clearPendingQueues();
     postponedMisreplicatedBlocks.clear();
-    postponedMisreplicatedBlocksCount.set(0);
   };
 
   public static LocatedBlock newLocatedBlock(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org