You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2016/12/01 18:12:06 UTC
hadoop git commit: HDFS-8674. Improve performance of postponed block
scans. Contributed by Daryn Sharp.
Repository: hadoop
Updated Branches:
refs/heads/trunk e0fa49234 -> 96c574927
HDFS-8674. Improve performance of postponed block scans. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/96c57492
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/96c57492
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/96c57492
Branch: refs/heads/trunk
Commit: 96c574927a600d15fab919df1fdc9e07887af6c5
Parents: e0fa492
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Dec 1 12:11:27 2016 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Dec 1 12:11:27 2016 -0600
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 79 ++++++--------------
1 file changed, 24 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/96c57492/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1b744e7..e60703b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -30,6 +30,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -43,8 +44,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import javax.management.ObjectName;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -101,7 +100,6 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
-import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
@@ -184,7 +182,6 @@ public class BlockManager implements BlockStatsMXBean {
/** flag indicating whether replication queues have been initialized */
private boolean initializedReplQueues;
- private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
private final long startupDelayBlockDeletionInMs;
private final BlockReportLeaseManager blockReportLeaseManager;
private ObjectName mxBeanName;
@@ -219,7 +216,7 @@ public class BlockManager implements BlockStatsMXBean {
}
/** Used by metrics */
public long getPostponedMisreplicatedBlocksCount() {
- return postponedMisreplicatedBlocksCount.get();
+ return postponedMisreplicatedBlocks.size();
}
/** Used by metrics */
public int getPendingDataNodeMessageCount() {
@@ -275,8 +272,10 @@ public class BlockManager implements BlockStatsMXBean {
* notified of all block deletions that might have been pending
* when the failover happened.
*/
- private final LightWeightHashSet<Block> postponedMisreplicatedBlocks =
- new LightWeightHashSet<>();
+ private final Set<Block> postponedMisreplicatedBlocks =
+ new LinkedHashSet<Block>();
+ private final int blocksPerPostpondedRescan;
+ private final ArrayList<Block> rescannedMisreplicatedBlocks;
/**
* Maps a StorageID to the set of blocks that are "extra" for this
@@ -378,7 +377,10 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
this.blockIdManager = new BlockIdManager(this);
-
+ blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,
+ datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan());
+ rescannedMisreplicatedBlocks =
+ new ArrayList<Block>(blocksPerPostpondedRescan);
startupDelayBlockDeletionInMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
@@ -1613,9 +1615,7 @@ public class BlockManager implements BlockStatsMXBean {
private void postponeBlock(Block blk) {
- if (postponedMisreplicatedBlocks.add(blk)) {
- postponedMisreplicatedBlocksCount.incrementAndGet();
- }
+ postponedMisreplicatedBlocks.add(blk);
}
@@ -2375,39 +2375,14 @@ public class BlockManager implements BlockStatsMXBean {
if (getPostponedMisreplicatedBlocksCount() == 0) {
return;
}
- long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
namesystem.writeLock();
- long startPostponedMisReplicatedBlocksCount =
- getPostponedMisreplicatedBlocksCount();
+ long startTime = Time.monotonicNow();
+ long startSize = postponedMisreplicatedBlocks.size();
try {
- // blocksPerRescan is the configured number of blocks per rescan.
- // Randomly select blocksPerRescan consecutive blocks from the HashSet
- // when the number of blocks remaining is larger than blocksPerRescan.
- // The reason we don't always pick the first blocksPerRescan blocks is to
- // handle the case if for some reason some datanodes remain in
- // content stale state for a long time and only impact the first
- // blocksPerRescan blocks.
- int i = 0;
- long startIndex = 0;
- long blocksPerRescan =
- datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
- long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
- if (base > 0) {
- startIndex = ThreadLocalRandom.current().nextLong() % (base+1);
- if (startIndex < 0) {
- startIndex += (base+1);
- }
- }
Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
- for (int tmp = 0; tmp < startIndex; tmp++) {
- it.next();
- }
-
- for (;it.hasNext(); i++) {
+ for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) {
Block b = it.next();
- if (i >= blocksPerRescan) {
- break;
- }
+ it.remove();
BlockInfo bi = getStoredBlock(b);
if (bi == null) {
@@ -2416,8 +2391,6 @@ public class BlockManager implements BlockStatsMXBean {
"Postponed mis-replicated block " + b + " no longer found " +
"in block map.");
}
- it.remove();
- postponedMisreplicatedBlocksCount.decrementAndGet();
continue;
}
MisReplicationResult res = processMisReplicatedBlock(bi);
@@ -2425,20 +2398,19 @@ public class BlockManager implements BlockStatsMXBean {
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
"Re-scanned block " + b + ", result is " + res);
}
- if (res != MisReplicationResult.POSTPONE) {
- it.remove();
- postponedMisreplicatedBlocksCount.decrementAndGet();
+ if (res == MisReplicationResult.POSTPONE) {
+ rescannedMisreplicatedBlocks.add(b);
}
}
} finally {
- long endPostponedMisReplicatedBlocksCount =
- getPostponedMisreplicatedBlocksCount();
+ postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
+ rescannedMisreplicatedBlocks.clear();
+ long endSize = postponedMisreplicatedBlocks.size();
namesystem.writeUnlock();
LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
- (Time.monotonicNow() - startTimeRescanPostponedMisReplicatedBlocks) +
- " msecs. " + endPostponedMisReplicatedBlocksCount +
- " blocks are left. " + (startPostponedMisReplicatedBlocksCount -
- endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
+ (Time.monotonicNow() - startTime) + " msecs. " +
+ endSize + " blocks are left. " +
+ (startSize - endSize) + " blocks were removed.");
}
}
@@ -4048,9 +4020,7 @@ public class BlockManager implements BlockStatsMXBean {
// Remove the block from pendingReconstruction and neededReconstruction
pendingReconstruction.remove(block);
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
- if (postponedMisreplicatedBlocks.remove(block)) {
- postponedMisreplicatedBlocksCount.decrementAndGet();
- }
+ postponedMisreplicatedBlocks.remove(block);
}
public BlockInfo getStoredBlock(Block block) {
@@ -4464,7 +4434,6 @@ public class BlockManager implements BlockStatsMXBean {
invalidateBlocks.clear();
datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear();
- postponedMisreplicatedBlocksCount.set(0);
};
public static LocatedBlock newLocatedBlock(
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org