You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2016/03/17 00:54:18 UTC
[2/2] hadoop git commit: HDFS-9857. Erasure Coding: Rename
replication-based names in BlockManager to more generic [part-1]. Contributed
by Rakesh R.
HDFS-9857. Erasure Coding: Rename replication-based names in BlockManager to more generic [part-1]. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/32d043d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/32d043d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/32d043d9
Branch: refs/heads/trunk
Commit: 32d043d9c5f4615058ea4f65a58ba271ba47fcb5
Parents: 605fdcb
Author: Zhe Zhang <ze...@zezhang-ld1.linkedin.biz>
Authored: Wed Mar 16 16:53:58 2016 -0700
Committer: Zhe Zhang <ze...@zezhang-ld1.linkedin.biz>
Committed: Wed Mar 16 16:53:58 2016 -0700
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 268 +++++------
.../blockmanagement/DecommissionManager.java | 30 +-
.../blockmanagement/LowRedundancyBlocks.java | 458 +++++++++++++++++++
.../blockmanagement/UnderReplicatedBlocks.java | 448 ------------------
.../blockmanagement/BlockManagerTestUtil.java | 2 +-
.../blockmanagement/TestBlockManager.java | 20 +-
.../TestLowRedundancyBlockQueues.java | 182 ++++++++
.../blockmanagement/TestPendingReplication.java | 14 +-
.../blockmanagement/TestReplicationPolicy.java | 158 +++----
.../TestUnderReplicatedBlockQueues.java | 182 --------
.../hdfs/server/namenode/TestMetaSave.java | 2 +-
11 files changed, 891 insertions(+), 873 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 6ed102c..66ab789 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -149,7 +149,7 @@ public class BlockManager implements BlockStatsMXBean {
private volatile long pendingReplicationBlocksCount = 0L;
private volatile long corruptReplicaBlocksCount = 0L;
- private volatile long underReplicatedBlocksCount = 0L;
+ private volatile long lowRedundancyBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;
/** flag indicating whether replication queues have been initialized */
@@ -166,7 +166,7 @@ public class BlockManager implements BlockStatsMXBean {
}
/** Used by metrics */
public long getUnderReplicatedBlocksCount() {
- return underReplicatedBlocksCount;
+ return lowRedundancyBlocksCount;
}
/** Used by metrics */
public long getCorruptReplicaBlocksCount() {
@@ -250,9 +250,10 @@ public class BlockManager implements BlockStatsMXBean {
/**
* Store set of Blocks that need to be replicated 1 or more times.
- * We also store pending replication-orders.
+ * We also store pending reconstruction-orders.
*/
- public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
+ public final LowRedundancyBlocks neededReconstruction =
+ new LowRedundancyBlocks();
@VisibleForTesting
final PendingReplicationBlocks pendingReplications;
@@ -294,20 +295,20 @@ public class BlockManager implements BlockStatsMXBean {
private boolean shouldPostponeBlocksFromFuture = false;
/**
- * Process replication queues asynchronously to allow namenode safemode exit
- * and failover to be faster. HDFS-5496
+ * Process reconstruction queues asynchronously to allow namenode safemode
+ * exit and failover to be faster. HDFS-5496.
*/
- private Daemon replicationQueuesInitializer = null;
+ private Daemon reconstructionQueuesInitializer = null;
/**
- * Number of blocks to process asychronously for replication queues
+ * Number of blocks to process asychronously for reconstruction queues
* initialization once aquired the namesystem lock. Remaining blocks will be
* processed again after aquiring lock again.
*/
private int numBlocksPerIteration;
/**
- * Progress of the Replication queues initialisation.
+ * Progress of the Reconstruction queues initialisation.
*/
- private double replicationQueuesInitProgress = 0.0;
+ private double reconstructionQueuesInitProgress = 0.0;
/** for block replicas placement */
private BlockPlacementPolicies placementPolicies;
@@ -576,12 +577,12 @@ public class BlockManager implements BlockStatsMXBean {
out.println("Live Datanodes: " + live.size());
out.println("Dead Datanodes: " + dead.size());
//
- // Dump contents of neededReplication
+ // Dump contents of neededReconstruction
//
- synchronized (neededReplications) {
- out.println("Metasave: Blocks waiting for replication: " +
- neededReplications.size());
- for (Block block : neededReplications) {
+ synchronized (neededReconstruction) {
+ out.println("Metasave: Blocks waiting for reconstruction: "
+ + neededReconstruction.size());
+ for (Block block : neededReconstruction) {
dumpBlockMeta(block, out);
}
}
@@ -616,7 +617,7 @@ public class BlockManager implements BlockStatsMXBean {
// source node returned is not used
chooseSourceDatanodes(getStoredBlock(block), containingNodes,
containingLiveReplicasNodes, numReplicas,
- new LinkedList<Byte>(), UnderReplicatedBlocks.LEVEL);
+ new LinkedList<Byte>(), LowRedundancyBlocks.LEVEL);
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
@@ -849,9 +850,9 @@ public class BlockManager implements BlockStatsMXBean {
// is happening
bc.convertLastBlockToUC(lastBlock, targets);
- // Remove block from replication queue.
+ // Remove block from reconstruction queue.
NumberReplicas replicas = countNodes(lastBlock);
- neededReplications.remove(lastBlock, replicas.liveReplicas(),
+ neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
replicas.readOnlyReplicas(),
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
pendingReplications.remove(lastBlock);
@@ -1365,8 +1366,8 @@ public class BlockManager implements BlockStatsMXBean {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node, numberOfReplicas);
} else if (isPopulatingReplQueues()) {
- // add the block to neededReplication
- updateNeededReplications(b.getStored(), -1, 0);
+ // add the block to neededReconstruction
+ updateNeededReconstructions(b.getStored(), -1, 0);
}
}
@@ -1418,13 +1419,13 @@ public class BlockManager implements BlockStatsMXBean {
void updateState() {
pendingReplicationBlocksCount = pendingReplications.size();
- underReplicatedBlocksCount = neededReplications.size();
+ lowRedundancyBlocksCount = neededReconstruction.size();
corruptReplicaBlocksCount = corruptReplicas.size();
}
- /** Return number of under-replicated but not missing blocks */
+ /** Return number of low redundancy blocks but not missing blocks. */
public int getUnderReplicatedNotMissingBlocks() {
- return neededReplications.getUnderReplicatedBlockCount();
+ return neededReconstruction.getLowRedundancyBlockCount();
}
/**
@@ -1452,25 +1453,26 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
- * Scan blocks in {@link #neededReplications} and assign reconstruction
+ * Scan blocks in {@link #neededReconstruction} and assign reconstruction
* (replication or erasure coding) work to data-nodes they belong to.
*
* The number of process blocks equals either twice the number of live
- * data-nodes or the number of under-replicated blocks whichever is less.
+ * data-nodes or the number of low redundancy blocks whichever is less.
*
- * @return number of blocks scheduled for replication during this iteration.
+ * @return number of blocks scheduled for reconstruction during this
+ * iteration.
*/
int computeBlockReconstructionWork(int blocksToProcess) {
- List<List<BlockInfo>> blocksToReplicate = null;
+ List<List<BlockInfo>> blocksToReconstruct = null;
namesystem.writeLock();
try {
- // Choose the blocks to be replicated
- blocksToReplicate = neededReplications
- .chooseUnderReplicatedBlocks(blocksToProcess);
+ // Choose the blocks to be reconstructed
+ blocksToReconstruct = neededReconstruction
+ .chooseLowRedundancyBlocks(blocksToProcess);
} finally {
namesystem.writeUnlock();
}
- return computeReconstructionWorkForBlocks(blocksToReplicate);
+ return computeReconstructionWorkForBlocks(blocksToReconstruct);
}
/**
@@ -1489,7 +1491,7 @@ public class BlockManager implements BlockStatsMXBean {
// Step 1: categorize at-risk blocks into replication and EC tasks
namesystem.writeLock();
try {
- synchronized (neededReplications) {
+ synchronized (neededReconstruction) {
for (int priority = 0; priority < blocksToReconstruct
.size(); priority++) {
for (BlockInfo block : blocksToReconstruct.get(priority)) {
@@ -1533,7 +1535,7 @@ public class BlockManager implements BlockStatsMXBean {
continue;
}
- synchronized (neededReplications) {
+ synchronized (neededReconstruction) {
if (validateReconstructionWork(rw)) {
scheduledWork++;
}
@@ -1544,7 +1546,7 @@ public class BlockManager implements BlockStatsMXBean {
}
if (blockLog.isDebugEnabled()) {
- // log which blocks have been scheduled for replication
+ // log which blocks have been scheduled for reconstruction
for(BlockReconstructionWork rw : reconWork){
DatanodeStorageInfo[] targets = rw.getTargets();
if (targets != null && targets.length != 0) {
@@ -1558,8 +1560,9 @@ public class BlockManager implements BlockStatsMXBean {
}
}
- blockLog.debug("BLOCK* neededReplications = {} pendingReplications = {}",
- neededReplications.size(), pendingReplications.size());
+ blockLog.debug(
+ "BLOCK* neededReconstruction = {} pendingReplications = {}",
+ neededReconstruction.size(), pendingReplications.size());
}
return scheduledWork;
@@ -1576,8 +1579,8 @@ public class BlockManager implements BlockStatsMXBean {
int priority) {
// skip abandoned block or block reopened for append
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
- // remove from neededReplications
- neededReplications.remove(block, priority);
+ // remove from neededReconstruction
+ neededReconstruction.remove(block, priority);
return null;
}
@@ -1605,8 +1608,8 @@ public class BlockManager implements BlockStatsMXBean {
int pendingNum = pendingReplications.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
requiredReplication)) {
- neededReplications.remove(block, priority);
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+ neededReconstruction.remove(block, priority);
+ blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
" it has enough replicas", block);
return null;
}
@@ -1662,7 +1665,7 @@ public class BlockManager implements BlockStatsMXBean {
// Recheck since global lock was released
// skip abandoned block or block reopened for append
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
- neededReplications.remove(block, priority);
+ neededReconstruction.remove(block, priority);
rw.resetTargets();
return false;
}
@@ -1673,7 +1676,7 @@ public class BlockManager implements BlockStatsMXBean {
final int pendingNum = pendingReplications.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
requiredReplication)) {
- neededReplications.remove(block, priority);
+ neededReconstruction.remove(block, priority);
rw.resetTargets();
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
@@ -1705,9 +1708,9 @@ public class BlockManager implements BlockStatsMXBean {
+ "pendingReplications", block);
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
- // remove from neededReplications
+ // remove from neededReconstruction
if(numEffectiveReplicas + targets.length >= requiredReplication) {
- neededReplications.remove(block, priority);
+ neededReconstruction.remove(block, priority);
}
return true;
}
@@ -1852,7 +1855,7 @@ public class BlockManager implements BlockStatsMXBean {
continue;
}
- if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
+ if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
&& !node.isDecommissionInProgress()
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
continue; // already reached replication limit
@@ -1905,9 +1908,10 @@ public class BlockManager implements BlockStatsMXBean {
continue;
}
NumberReplicas num = countNodes(timedOutItems[i]);
- if (isNeededReplication(bi, num.liveReplicas())) {
- neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(),
- num.decommissionedAndDecommissioning(), getReplication(bi));
+ if (isNeededReconstruction(bi, num.liveReplicas())) {
+ neededReconstruction.add(bi, num.liveReplicas(),
+ num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
+ getReplication(bi));
}
}
} finally {
@@ -2777,7 +2781,7 @@ public class BlockManager implements BlockStatsMXBean {
* intended for use with initial block report at startup. If not in startup
* safe mode, will call standard addStoredBlock(). Assumes this method is
* called "immediately" so there is no need to refresh the storedBlock from
- * blocksMap. Doesn't handle underReplication/overReplication, or worry about
+ * blocksMap. Doesn't handle low redundancy/extra redundancy, or worry about
* pendingReplications or corruptReplicas, because it's in startup safe mode.
* Doesn't log every block, because there are typically millions of them.
*
@@ -2812,7 +2816,7 @@ public class BlockManager implements BlockStatsMXBean {
/**
* Modify (block-->datanode) map. Remove block from set of
- * needed replications if this takes care of the problem.
+ * needed reconstruction if this takes care of the problem.
* @return the block that is stored in blocksMap.
*/
private Block addStoredBlock(final BlockInfo block,
@@ -2890,24 +2894,25 @@ public class BlockManager implements BlockStatsMXBean {
return storedBlock;
}
- // do not try to handle over/under-replicated blocks during first safe mode
+ // do not try to handle extra/low redundancy blocks during first safe mode
if (!isPopulatingReplQueues()) {
return storedBlock;
}
- // handle underReplication/overReplication
+ // handle low redundancy/extra redundancy
short fileReplication = getExpectedReplicaNum(storedBlock);
- if (!isNeededReplication(storedBlock, numCurrentReplica)) {
- neededReplications.remove(storedBlock, numCurrentReplica,
+ if (!isNeededReconstruction(storedBlock, numCurrentReplica)) {
+ neededReconstruction.remove(storedBlock, numCurrentReplica,
num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), fileReplication);
} else {
- updateNeededReplications(storedBlock, curReplicaDelta, 0);
+ updateNeededReconstructions(storedBlock, curReplicaDelta, 0);
}
- if (shouldProcessOverReplicated(num, fileReplication)) {
- processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
+ if (shouldProcessExtraRedundancy(num, fileReplication)) {
+ processExtraRedundancyBlock(storedBlock, fileReplication, node,
+ delNodeHint);
}
- // If the file replication has reached desired value
+ // If the file redundancy has reached desired value
// we can remove any corrupt replicas the block may have
int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock);
int numCorruptNodes = num.corruptReplicas();
@@ -2922,7 +2927,7 @@ public class BlockManager implements BlockStatsMXBean {
return storedBlock;
}
- private boolean shouldProcessOverReplicated(NumberReplicas num,
+ private boolean shouldProcessExtraRedundancy(NumberReplicas num,
int expectedNum) {
final int numCurrent = num.liveReplicas();
return numCurrent > expectedNum ||
@@ -2972,42 +2977,44 @@ public class BlockManager implements BlockStatsMXBean {
/**
* For each block in the name-node verify whether it belongs to any file,
- * over or under replicated. Place it into the respective queue.
+ * extra or low redundancy. Place it into the respective queue.
*/
public void processMisReplicatedBlocks() {
assert namesystem.hasWriteLock();
- stopReplicationInitializer();
- neededReplications.clear();
- replicationQueuesInitializer = new Daemon() {
+ stopReconstructionInitializer();
+ neededReconstruction.clear();
+ reconstructionQueuesInitializer = new Daemon() {
@Override
public void run() {
try {
processMisReplicatesAsync();
} catch (InterruptedException ie) {
- LOG.info("Interrupted while processing replication queues.");
+ LOG.info("Interrupted while processing reconstruction queues.");
} catch (Exception e) {
- LOG.error("Error while processing replication queues async", e);
+ LOG.error("Error while processing reconstruction queues async", e);
}
}
};
- replicationQueuesInitializer.setName("Replication Queue Initializer");
- replicationQueuesInitializer.start();
+ reconstructionQueuesInitializer
+ .setName("Reconstruction Queue Initializer");
+ reconstructionQueuesInitializer.start();
}
/*
- * Stop the ongoing initialisation of replication queues
+ * Stop the ongoing initialisation of reconstruction queues
*/
- private void stopReplicationInitializer() {
- if (replicationQueuesInitializer != null) {
- replicationQueuesInitializer.interrupt();
+ private void stopReconstructionInitializer() {
+ if (reconstructionQueuesInitializer != null) {
+ reconstructionQueuesInitializer.interrupt();
try {
- replicationQueuesInitializer.join();
+ reconstructionQueuesInitializer.join();
} catch (final InterruptedException e) {
- LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning..");
+ LOG.warn("Interrupted while waiting for "
+ + "reconstructionQueueInitializer. Returning..");
return;
} finally {
- replicationQueuesInitializer = null;
+ reconstructionQueuesInitializer = null;
}
}
}
@@ -3025,7 +3032,7 @@ public class BlockManager implements BlockStatsMXBean {
long startTimeMisReplicatedScan = Time.monotonicNow();
Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator();
long totalBlocks = blocksMap.size();
- replicationQueuesInitProgress = 0;
+ reconstructionQueuesInitProgress = 0;
long totalProcessed = 0;
long sleepDuration =
Math.max(1, Math.min(numBlocksPerIteration/1000, 10000));
@@ -3067,7 +3074,7 @@ public class BlockManager implements BlockStatsMXBean {
totalProcessed += processed;
// there is a possibility that if any of the blocks deleted/added during
// initialisation, then progress might be different.
- replicationQueuesInitProgress = Math.min((double) totalProcessed
+ reconstructionQueuesInitProgress = Math.min((double) totalProcessed
/ totalBlocks, 1.0);
if (!blocksItr.hasNext()) {
@@ -3097,12 +3104,12 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
- * Get the progress of the Replication queues initialisation
+ * Get the progress of the reconstruction queues initialisation
*
* @return Returns values between 0 and 1 for the progress.
*/
- public double getReplicationQueuesInitProgress() {
- return replicationQueuesInitProgress;
+ public double getReconstructionQueuesInitProgress() {
+ return reconstructionQueuesInitProgress;
}
/**
@@ -3134,15 +3141,16 @@ public class BlockManager implements BlockStatsMXBean {
short expectedReplication = getExpectedReplicaNum(block);
NumberReplicas num = countNodes(block);
final int numCurrentReplica = num.liveReplicas();
- // add to under-replicated queue if need to be
- if (isNeededReplication(block, numCurrentReplica)) {
- if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(),
- num.decommissionedAndDecommissioning(), expectedReplication)) {
+ // add to low redundancy queue if need to be
+ if (isNeededReconstruction(block, numCurrentReplica)) {
+ if (neededReconstruction.add(block, numCurrentReplica,
+ num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
+ expectedReplication)) {
return MisReplicationResult.UNDER_REPLICATED;
}
}
- if (shouldProcessOverReplicated(num, expectedReplication)) {
+ if (shouldProcessExtraRedundancy(num, expectedReplication)) {
if (num.replicasOnStaleNodes() > 0) {
// If any of the replicas of this block are on nodes that are
// considered "stale", then these replicas may in fact have
@@ -3152,8 +3160,8 @@ public class BlockManager implements BlockStatsMXBean {
return MisReplicationResult.POSTPONE;
}
- // over-replicated block
- processOverReplicatedBlock(block, expectedReplication, null, null);
+ // extra redundancy block
+ processExtraRedundancyBlock(block, expectedReplication, null, null);
return MisReplicationResult.OVER_REPLICATED;
}
@@ -3167,12 +3175,12 @@ public class BlockManager implements BlockStatsMXBean {
return;
}
- // update needReplication priority queues
+ // update neededReconstruction priority queues
b.setReplication(newRepl);
- updateNeededReplications(b, 0, newRepl - oldRepl);
+ updateNeededReconstructions(b, 0, newRepl - oldRepl);
if (oldRepl > newRepl) {
- processOverReplicatedBlock(b, newRepl, null, null);
+ processExtraRedundancyBlock(b, newRepl, null, null);
}
}
@@ -3181,7 +3189,7 @@ public class BlockManager implements BlockStatsMXBean {
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
*/
- private void processOverReplicatedBlock(final BlockInfo block,
+ private void processExtraRedundancyBlock(final BlockInfo block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
assert namesystem.hasWriteLock();
@@ -3405,7 +3413,7 @@ public class BlockManager implements BlockStatsMXBean {
//
if (!storedBlock.isDeleted()) {
bmSafeMode.decrementSafeBlockCount(storedBlock);
- updateNeededReplications(storedBlock, -1, 0);
+ updateNeededReconstructions(storedBlock, -1, 0);
}
excessReplicas.remove(node, storedBlock);
@@ -3748,29 +3756,29 @@ public class BlockManager implements BlockStatsMXBean {
/**
* On stopping decommission, check if the node has excess replicas.
- * If there are any excess replicas, call processOverReplicatedBlock().
- * Process over replicated blocks only when active NN is out of safe mode.
+ * If there are any excess replicas, call processExtraRedundancyBlock().
+ * Process extra redundancy blocks only when active NN is out of safe mode.
*/
- void processOverReplicatedBlocksOnReCommission(
+ void processExtraRedundancyBlocksOnReCommission(
final DatanodeDescriptor srcNode) {
if (!isPopulatingReplQueues()) {
return;
}
final Iterator<BlockInfo> it = srcNode.getBlockIterator();
- int numOverReplicated = 0;
+ int numExtraRedundancy = 0;
while(it.hasNext()) {
final BlockInfo block = it.next();
int expectedReplication = this.getReplication(block);
NumberReplicas num = countNodes(block);
- if (shouldProcessOverReplicated(num, expectedReplication)) {
- // over-replicated block
- processOverReplicatedBlock(block, (short) expectedReplication, null,
+ if (shouldProcessExtraRedundancy(num, expectedReplication)) {
+ // extra redundancy block
+ processExtraRedundancyBlock(block, (short) expectedReplication, null,
null);
- numOverReplicated++;
+ numExtraRedundancy++;
}
}
- LOG.info("Invalidated " + numOverReplicated + " over-replicated blocks on " +
- srcNode + " during recommissioning");
+ LOG.info("Invalidated " + numExtraRedundancy
+ + " extra redundancy blocks on " + srcNode + " during recommissioning");
}
/**
@@ -3789,9 +3797,9 @@ public class BlockManager implements BlockStatsMXBean {
updateState();
if (pendingReplicationBlocksCount == 0 &&
- underReplicatedBlocksCount == 0) {
- LOG.info("Node {} is dead and there are no under-replicated" +
- " blocks or blocks pending replication. Safe to decommission.",
+ lowRedundancyBlocksCount == 0) {
+ LOG.info("Node {} is dead and there are no low redundancy" +
+ " blocks or blocks pending reconstruction. Safe to decommission.",
node);
return true;
}
@@ -3835,9 +3843,9 @@ public class BlockManager implements BlockStatsMXBean {
block.setNumBytes(BlockCommand.NO_ACK);
addToInvalidates(block);
removeBlockFromMap(block);
- // Remove the block from pendingReplications and neededReplications
+ // Remove the block from pendingReplications and neededReconstruction
pendingReplications.remove(block);
- neededReplications.remove(block, UnderReplicatedBlocks.LEVEL);
+ neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
if (postponedMisreplicatedBlocks.remove(block)) {
postponedMisreplicatedBlocksCount.decrementAndGet();
}
@@ -3859,8 +3867,8 @@ public class BlockManager implements BlockStatsMXBean {
new Block(BlockIdManager.convertToStripedID(block.getBlockId())));
}
- /** updates a block in under replication queue */
- private void updateNeededReplications(final BlockInfo block,
+ /** updates a block in needed reconstruction queue. */
+ private void updateNeededReconstructions(final BlockInfo block,
final int curReplicasDelta, int expectedReplicasDelta) {
namesystem.writeLock();
try {
@@ -3869,14 +3877,14 @@ public class BlockManager implements BlockStatsMXBean {
}
NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block);
- if (isNeededReplication(block, repl.liveReplicas())) {
- neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(),
- repl.decommissionedAndDecommissioning(), curExpectedReplicas,
- curReplicasDelta, expectedReplicasDelta);
+ if (isNeededReconstruction(block, repl.liveReplicas())) {
+ neededReconstruction.update(block, repl.liveReplicas(),
+ repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
+ curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
} else {
int oldReplicas = repl.liveReplicas()-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
- neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
+ neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(),
repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
}
} finally {
@@ -3885,10 +3893,10 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
- * Check replication of the blocks in the collection.
- * If any block is needed replication, insert it into the replication queue.
+ * Check sufficient redundancy of the blocks in the collection. If any block
+ * is needed reconstruction, insert it into the reconstruction queue.
* Otherwise, if the block is more than the expected replication factor,
- * process it as an over replicated block.
+ * process it as an extra redundancy block.
*/
public void checkReplication(BlockCollection bc) {
for (BlockInfo block : bc.getBlocks()) {
@@ -3896,11 +3904,11 @@ public class BlockManager implements BlockStatsMXBean {
final NumberReplicas n = countNodes(block);
final int pending = pendingReplications.getNumReplicas(block);
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
- neededReplications.add(block, n.liveReplicas() + pending,
+ neededReconstruction.add(block, n.liveReplicas() + pending,
n.readOnlyReplicas(),
n.decommissionedAndDecommissioning(), expected);
- } else if (shouldProcessOverReplicated(n, expected)) {
- processOverReplicatedBlock(block, expected, null, null);
+ } else if (shouldProcessExtraRedundancy(n, expected)) {
+ processExtraRedundancyBlock(block, expected, null, null);
}
}
}
@@ -3926,7 +3934,7 @@ public class BlockManager implements BlockStatsMXBean {
try {
// blocks should not be replicated or removed if safe mode is on
if (namesystem.isInSafeMode()) {
- LOG.debug("In safemode, not computing replication work");
+ LOG.debug("In safemode, not computing reconstruction work");
return 0;
}
try {
@@ -3980,10 +3988,10 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
- * A block needs replication if the number of replicas is less than expected
- * or if it does not have enough racks.
+ * A block needs reconstruction if the number of replicas is less than
+ * expected or if it does not have enough racks.
*/
- boolean isNeededReplication(BlockInfo storedBlock, int current) {
+ boolean isNeededReconstruction(BlockInfo storedBlock, int current) {
int expected = getExpectedReplicaNum(storedBlock);
return storedBlock.isComplete()
&& (current < expected || !isPlacementPolicySatisfied(storedBlock));
@@ -3997,12 +4005,12 @@ public class BlockManager implements BlockStatsMXBean {
public long getMissingBlocksCount() {
// not locking
- return this.neededReplications.getCorruptBlockSize();
+ return this.neededReconstruction.getCorruptBlockSize();
}
public long getMissingReplOneBlocksCount() {
// not locking
- return this.neededReplications.getCorruptReplOneBlockSize();
+ return this.neededReconstruction.getCorruptReplOneBlockSize();
}
public BlockInfo addBlockCollection(BlockInfo block,
@@ -4050,8 +4058,8 @@ public class BlockManager implements BlockStatsMXBean {
* Return an iterator over the set of blocks for which there are no replicas.
*/
public Iterator<BlockInfo> getCorruptReplicaBlockIterator() {
- return neededReplications.iterator(
- UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+ return neededReconstruction.iterator(
+ LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
/**
@@ -4070,7 +4078,7 @@ public class BlockManager implements BlockStatsMXBean {
/** @return the size of UnderReplicatedBlocks */
public int numOfUnderReplicatedBlocks() {
- return neededReplications.size();
+ return neededReconstruction.size();
}
/**
@@ -4232,7 +4240,7 @@ public class BlockManager implements BlockStatsMXBean {
* this NameNode.
*/
public void clearQueues() {
- neededReplications.clear();
+ neededReconstruction.clear();
pendingReplications.clear();
excessReplicas.clear();
invalidateBlocks.clear();
@@ -4298,7 +4306,7 @@ public class BlockManager implements BlockStatsMXBean {
}
public void shutdown() {
- stopReplicationInitializer();
+ stopReconstructionInitializer();
blocksMap.close();
MBeans.unregister(mxBeanName);
mxBeanName = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 480670a..3b5f103 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -215,10 +215,10 @@ public class DecommissionManager {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager
hbManager.stopDecommission(node);
- // Over-replicated blocks will be detected and processed when
+ // extra redundancy blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive()) {
- blockManager.processOverReplicatedBlocksOnReCommission(node);
+ blockManager.processExtraRedundancyBlocksOnReCommission(node);
}
// Remove from tracking in DecommissionManager
pendingNodes.remove(node);
@@ -513,9 +513,9 @@ public class DecommissionManager {
final List<BlockInfo> insufficientList,
boolean pruneReliableBlocks) {
boolean firstReplicationLog = true;
- int underReplicatedBlocks = 0;
+ int lowRedundancyBlocks = 0;
int decommissionOnlyReplicas = 0;
- int underReplicatedInOpenFiles = 0;
+ int lowRedundancyInOpenFiles = 0;
while (it.hasNext()) {
numBlocksChecked++;
final BlockInfo block = it.next();
@@ -537,22 +537,22 @@ public class DecommissionManager {
final NumberReplicas num = blockManager.countNodes(block);
final int liveReplicas = num.liveReplicas();
- // Schedule under-replicated blocks for replication if not already
+ // Schedule low redundancy blocks for reconstruction if not already
// pending
- if (blockManager.isNeededReplication(block, liveReplicas)) {
- if (!blockManager.neededReplications.contains(block) &&
+ if (blockManager.isNeededReconstruction(block, liveReplicas)) {
+ if (!blockManager.neededReconstruction.contains(block) &&
blockManager.pendingReplications.getNumReplicas(block) == 0 &&
blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
- blockManager.neededReplications.add(block,
+ blockManager.neededReconstruction.add(block,
liveReplicas, num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(),
blockManager.getExpectedReplicaNum(block));
}
}
- // Even if the block is under-replicated,
- // it doesn't block decommission if it's sufficiently replicated
+ // Even if the block is without sufficient redundancy,
+ // it doesn't block decommission if has sufficient redundancy
if (isSufficient(block, bc, num)) {
if (pruneReliableBlocks) {
it.remove();
@@ -560,7 +560,7 @@ public class DecommissionManager {
continue;
}
- // We've found an insufficiently replicated block.
+ // We've found a block without sufficient redundancy.
if (insufficientList != null) {
insufficientList.add(block);
}
@@ -571,18 +571,18 @@ public class DecommissionManager {
firstReplicationLog = false;
}
// Update various counts
- underReplicatedBlocks++;
+ lowRedundancyBlocks++;
if (bc.isUnderConstruction()) {
- underReplicatedInOpenFiles++;
+ lowRedundancyInOpenFiles++;
}
if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) {
decommissionOnlyReplicas++;
}
}
- datanode.decommissioningStatus.set(underReplicatedBlocks,
+ datanode.decommissioningStatus.set(lowRedundancyBlocks,
decommissionOnlyReplicas,
- underReplicatedInOpenFiles);
+ lowRedundancyInOpenFiles);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
new file mode 100644
index 0000000..de8cf4e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
+
+/**
+ * Keep prioritized queues of low redundant blocks.
+ * Blocks have redundancy priority, with priority
+ * {@link #QUEUE_HIGHEST_PRIORITY} indicating the highest priority.
+ * </p>
+ * Having a prioritised queue allows the {@link BlockManager} to select
+ * which blocks to replicate first -it tries to give priority to data
+ * that is most at risk or considered most valuable.
+ *
+ * <p/>
+ * The policy for choosing which priority to give added blocks
+ * is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}.
+ * </p>
+ * <p>The queue order is as follows:</p>
+ * <ol>
+ * <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that should be redundant
+ * first. That is blocks with only one copy, or blocks with zero live
+ * copies but a copy in a node being decommissioned. These blocks
+ * are at risk of loss if the disk or server on which they
+ * remain fails.</li>
+ * <li>{@link #QUEUE_VERY_LOW_REDUNDANCY}: blocks that are very
+ * under-replicated compared to their expected values. Currently
+ * that means the ratio of the ratio of actual:expected means that
+ * there is <i>less than</i> 1:3.</li>. These blocks may not be at risk,
+ * but they are clearly considered "important".
+ * <li>{@link #QUEUE_LOW_REDUNDANCY}: blocks that are also under
+ * replicated, and the ratio of actual:expected is good enough that
+ * they do not need to go into the {@link #QUEUE_VERY_LOW_REDUNDANCY}
+ * queue.</li>
+ * <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
+ * many copies of a block as required, but the blocks are not adequately
+ * distributed. Loss of a rack/switch could take all copies off-line.</li>
+ * <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt
+ * and for which there are no-non-corrupt copies (currently) available.
+ * The policy here is to keep those corrupt blocks replicated, but give
+ * blocks that are not corrupt higher priority.</li>
+ * </ol>
+ */
+class LowRedundancyBlocks implements Iterable<BlockInfo> {
+ /** The total number of queues : {@value} */
+ static final int LEVEL = 5;
+ /** The queue with the highest priority: {@value} */
+ static final int QUEUE_HIGHEST_PRIORITY = 0;
+ /** The queue for blocks that are way below their expected value : {@value} */
+ static final int QUEUE_VERY_LOW_REDUNDANCY = 1;
+ /**
+ * The queue for "normally" without sufficient redundancy blocks : {@value}.
+ */
+ static final int QUEUE_LOW_REDUNDANCY = 2;
+ /** The queue for blocks that have the right number of replicas,
+ * but which the block manager felt were badly distributed: {@value}
+ */
+ static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
+ /** The queue for corrupt blocks: {@value} */
+ static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
+ /** the queues themselves */
+ private final List<LightWeightLinkedSet<BlockInfo>> priorityQueues
+ = new ArrayList<>(LEVEL);
+
+ /** The number of corrupt blocks with replication factor 1 */
+ private int corruptReplOneBlocks = 0;
+
+ /** Create an object. */
+ LowRedundancyBlocks() {
+ for (int i = 0; i < LEVEL; i++) {
+ priorityQueues.add(new LightWeightLinkedSet<BlockInfo>());
+ }
+ }
+
+ /**
+ * Empty the queues.
+ */
+ synchronized void clear() {
+ for (int i = 0; i < LEVEL; i++) {
+ priorityQueues.get(i).clear();
+ }
+ corruptReplOneBlocks = 0;
+ }
+
+ /** Return the total number of insufficient redundancy blocks. */
+ synchronized int size() {
+ int size = 0;
+ for (int i = 0; i < LEVEL; i++) {
+ size += priorityQueues.get(i).size();
+ }
+ return size;
+ }
+
+ /**
+ * Return the number of insufficiently redundant blocks excluding corrupt
+ * blocks.
+ */
+ synchronized int getLowRedundancyBlockCount() {
+ int size = 0;
+ for (int i = 0; i < LEVEL; i++) {
+ if (i != QUEUE_WITH_CORRUPT_BLOCKS) {
+ size += priorityQueues.get(i).size();
+ }
+ }
+ return size;
+ }
+
+ /** Return the number of corrupt blocks */
+ synchronized int getCorruptBlockSize() {
+ return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
+ }
+
+ /** Return the number of corrupt blocks with replication factor 1 */
+ synchronized int getCorruptReplOneBlockSize() {
+ return corruptReplOneBlocks;
+ }
+
+ /** Check if a block is in the neededReconstruction queue. */
+ synchronized boolean contains(BlockInfo block) {
+ for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) {
+ if (set.contains(block)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** Return the priority of a block
+ * @param curReplicas current number of replicas of the block
+ * @param expectedReplicas expected number of replicas of the block
+ * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
+ */
+ private int getPriority(BlockInfo block,
+ int curReplicas,
+ int readOnlyReplicas,
+ int decommissionedReplicas,
+ int expectedReplicas) {
+ assert curReplicas >= 0 : "Negative replicas!";
+ if (curReplicas >= expectedReplicas) {
+ // Block has enough copies, but not enough racks
+ return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
+ }
+ if (block.isStriped()) {
+ BlockInfoStriped sblk = (BlockInfoStriped) block;
+ return getPriorityStriped(curReplicas, decommissionedReplicas,
+ sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
+ } else {
+ return getPriorityContiguous(curReplicas, readOnlyReplicas,
+ decommissionedReplicas, expectedReplicas);
+ }
+ }
+
+ private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
+ int decommissionedReplicas, int expectedReplicas) {
+ if (curReplicas == 0) {
+ // If there are zero non-decommissioned replicas but there are
+ // some decommissioned replicas, then assign them highest priority
+ if (decommissionedReplicas > 0) {
+ return QUEUE_HIGHEST_PRIORITY;
+ }
+ if (readOnlyReplicas > 0) {
+ // only has read-only replicas, highest risk
+ // since the read-only replicas may go down all together.
+ return QUEUE_HIGHEST_PRIORITY;
+ }
+ //all we have are corrupt blocks
+ return QUEUE_WITH_CORRUPT_BLOCKS;
+ } else if (curReplicas == 1) {
+ // only one replica, highest risk of loss
+ // highest priority
+ return QUEUE_HIGHEST_PRIORITY;
+ } else if ((curReplicas * 3) < expectedReplicas) {
+ //can only afford one replica loss
+ //this is considered very insufficiently redundant blocks.
+ return QUEUE_VERY_LOW_REDUNDANCY;
+ } else {
+ //add to the normal queue for insufficiently redundant blocks
+ return QUEUE_LOW_REDUNDANCY;
+ }
+ }
+
+ private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
+ short dataBlkNum, short parityBlkNum) {
+ if (curReplicas < dataBlkNum) {
+ // There are some replicas on decommissioned nodes so it's not corrupted
+ if (curReplicas + decommissionedReplicas >= dataBlkNum) {
+ return QUEUE_HIGHEST_PRIORITY;
+ }
+ return QUEUE_WITH_CORRUPT_BLOCKS;
+ } else if (curReplicas == dataBlkNum) {
+ // highest risk of loss, highest priority
+ return QUEUE_HIGHEST_PRIORITY;
+ } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
+ // can only afford one replica loss
+ // this is considered very insufficiently redundant blocks.
+ return QUEUE_VERY_LOW_REDUNDANCY;
+ } else {
+ // add to the normal queue for insufficiently redundant blocks.
+ return QUEUE_LOW_REDUNDANCY;
+ }
+ }
+
+ /**
+ * Add a block to insufficiently redundant queue according to its priority.
+ *
+ * @param block a low redundancy block
+ * @param curReplicas current number of replicas of the block
+ * @param decomissionedReplicas the number of decommissioned replicas
+ * @param expectedReplicas expected number of replicas of the block
+ * @return true if the block was added to a queue.
+ */
+ synchronized boolean add(BlockInfo block,
+ int curReplicas,
+ int readOnlyReplicas,
+ int decomissionedReplicas,
+ int expectedReplicas) {
+ assert curReplicas >= 0 : "Negative replicas!";
+ final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
+ decomissionedReplicas, expectedReplicas);
+ if(priorityQueues.get(priLevel).add(block)) {
+ if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
+ expectedReplicas == 1) {
+ corruptReplOneBlocks++;
+ }
+ NameNode.blockStateChangeLog.debug(
+ "BLOCK* NameSystem.LowRedundancyBlock.add: {}"
+ + " has only {} replicas and need {} replicas so is added to"
+ + " neededReconstructions at priority level {}",
+ block, curReplicas, expectedReplicas, priLevel);
+
+ return true;
+ }
+ return false;
+ }
+
+ /** Remove a block from a low redundancy queue. */
+ synchronized boolean remove(BlockInfo block,
+ int oldReplicas,
+ int oldReadOnlyReplicas,
+ int decommissionedReplicas,
+ int oldExpectedReplicas) {
+ final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
+ decommissionedReplicas, oldExpectedReplicas);
+ boolean removedBlock = remove(block, priLevel);
+ if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
+ oldExpectedReplicas == 1 &&
+ removedBlock) {
+ corruptReplOneBlocks--;
+ assert corruptReplOneBlocks >= 0 :
+ "Number of corrupt blocks with replication factor 1 " +
+ "should be non-negative";
+ }
+ return removedBlock;
+ }
+
+ /**
+ * Remove a block from the low redundancy queues.
+ *
+ * The priLevel parameter is a hint of which queue to query
+ * first: if negative or >= {@link #LEVEL} this shortcutting
+ * is not attmpted.
+ *
+ * If the block is not found in the nominated queue, an attempt is made to
+ * remove it from all queues.
+ *
+ * <i>Warning:</i> This is not a synchronized method.
+ * @param block block to remove
+ * @param priLevel expected privilege level
+ * @return true if the block was found and removed from one of the priority
+ * queues
+ */
+ boolean remove(BlockInfo block, int priLevel) {
+ if(priLevel >= 0 && priLevel < LEVEL
+ && priorityQueues.get(priLevel).remove(block)) {
+ NameNode.blockStateChangeLog.debug(
+ "BLOCK* NameSystem.LowRedundancyBlock.remove: Removing block {}"
+ + " from priority queue {}",
+ block, priLevel);
+ return true;
+ } else {
+ // Try to remove the block from all queues if the block was
+ // not found in the queue for the given priority level.
+ for (int i = 0; i < LEVEL; i++) {
+ if (i != priLevel && priorityQueues.get(i).remove(block)) {
+ NameNode.blockStateChangeLog.debug(
+ "BLOCK* NameSystem.LowRedundancyBlock.remove: Removing block" +
+ " {} from priority queue {}", block, i);
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Recalculate and potentially update the priority level of a block.
+ *
+ * If the block priority has changed from before an attempt is made to
+ * remove it from the block queue. Regardless of whether or not the block
+ * is in the block queue of (recalculate) priority, an attempt is made
+ * to add it to that queue. This ensures that the block will be
+ * in its expected priority queue (and only that queue) by the end of the
+ * method call.
+ * @param block a low redundancy block
+ * @param curReplicas current number of replicas of the block
+ * @param decommissionedReplicas the number of decommissioned replicas
+ * @param curExpectedReplicas expected number of replicas of the block
+ * @param curReplicasDelta the change in the replicate count from before
+ * @param expectedReplicasDelta the change in the expected replica count
+ * from before
+ */
+ synchronized void update(BlockInfo block, int curReplicas,
+ int readOnlyReplicas, int decommissionedReplicas,
+ int curExpectedReplicas,
+ int curReplicasDelta, int expectedReplicasDelta) {
+ int oldReplicas = curReplicas-curReplicasDelta;
+ int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+ int curPri = getPriority(block, curReplicas, readOnlyReplicas,
+ decommissionedReplicas, curExpectedReplicas);
+ int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
+ decommissionedReplicas, oldExpectedReplicas);
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("LowRedundancyBlocks.update " +
+ block +
+ " curReplicas " + curReplicas +
+ " curExpectedReplicas " + curExpectedReplicas +
+ " oldReplicas " + oldReplicas +
+ " oldExpectedReplicas " + oldExpectedReplicas +
+ " curPri " + curPri +
+ " oldPri " + oldPri);
+ }
+ if(oldPri != curPri) {
+ remove(block, oldPri);
+ }
+ if(priorityQueues.get(curPri).add(block)) {
+ NameNode.blockStateChangeLog.debug(
+ "BLOCK* NameSystem.LowRedundancyBlock.update: {} has only {} "
+ + "replicas and needs {} replicas so is added to "
+ + "neededReconstructions at priority level {}",
+ block, curReplicas, curExpectedReplicas, curPri);
+
+ }
+ if (oldPri != curPri || expectedReplicasDelta != 0) {
+ // corruptReplOneBlocks could possibly change
+ if (curPri == QUEUE_WITH_CORRUPT_BLOCKS &&
+ curExpectedReplicas == 1) {
+ // add a new corrupt block with replication factor 1
+ corruptReplOneBlocks++;
+ } else if (oldPri == QUEUE_WITH_CORRUPT_BLOCKS &&
+ curExpectedReplicas - expectedReplicasDelta == 1) {
+ // remove an existing corrupt block with replication factor 1
+ corruptReplOneBlocks--;
+ }
+ }
+ }
+
+ /**
+ * Get a list of block lists without sufficient redundancy. The index of
+ * block lists represents its replication priority. Iterates each block list
+ * in priority order beginning with the highest priority list. Iterators use
+ * a bookmark to resume where the previous iteration stopped. Returns when
+ * the block count is met or iteration reaches the end of the lowest priority
+ * list, in which case bookmarks for each block list are reset to the heads
+ * of their respective lists.
+ *
+ * @param blocksToProcess - number of blocks to fetch from low redundancy
+ * blocks.
+ * @return Return a list of block lists to be replicated. The block list
+ * index represents its redundancy priority.
+ */
+ synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
+ int blocksToProcess) {
+ final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
+
+ int count = 0;
+ int priority = 0;
+ for (; count < blocksToProcess && priority < LEVEL; priority++) {
+ if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
+ // do not choose corrupted blocks.
+ continue;
+ }
+
+ // Go through all blocks that need reconstructions with current priority.
+ // Set the iterator to the first unprocessed block at this priority level
+ final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
+ final List<BlockInfo> blocks = new LinkedList<>();
+ blocksToReconstruct.add(blocks);
+ // Loop through all remaining blocks in the list.
+ for(; count < blocksToProcess && i.hasNext(); count++) {
+ blocks.add(i.next());
+ }
+ }
+
+ if (priority == LEVEL) {
+ // Reset all bookmarks because there were no recently added blocks.
+ for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
+ q.resetBookmark();
+ }
+ }
+
+ return blocksToReconstruct;
+ }
+
+ /** Returns an iterator of all blocks in a given priority queue. */
+ synchronized Iterator<BlockInfo> iterator(int level) {
+ return priorityQueues.get(level).iterator();
+ }
+
+ /** Return an iterator of all the low redundancy blocks. */
+ @Override
+ public synchronized Iterator<BlockInfo> iterator() {
+ final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator();
+ return new Iterator<BlockInfo>() {
+ private Iterator<BlockInfo> b = q.next().iterator();
+
+ @Override
+ public BlockInfo next() {
+ hasNext();
+ return b.next();
+ }
+
+ @Override
+ public boolean hasNext() {
+ for(; !b.hasNext() && q.hasNext(); ) {
+ b = q.next().iterator();
+ }
+ return b.hasNext();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
deleted file mode 100644
index 5e8f7ed..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
-
-/**
- * Keep prioritized queues of under replicated blocks.
- * Blocks have replication priority, with priority {@link #QUEUE_HIGHEST_PRIORITY}
- * indicating the highest priority.
- * </p>
- * Having a prioritised queue allows the {@link BlockManager} to select
- * which blocks to replicate first -it tries to give priority to data
- * that is most at risk or considered most valuable.
- *
- * <p/>
- * The policy for choosing which priority to give added blocks
- * is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}.
- * </p>
- * <p>The queue order is as follows:</p>
- * <ol>
- * <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that must be replicated
- * first. That is blocks with only one copy, or blocks with zero live
- * copies but a copy in a node being decommissioned. These blocks
- * are at risk of loss if the disk or server on which they
- * remain fails.</li>
- * <li>{@link #QUEUE_VERY_UNDER_REPLICATED}: blocks that are very
- * under-replicated compared to their expected values. Currently
- * that means the ratio of the ratio of actual:expected means that
- * there is <i>less than</i> 1:3.</li>. These blocks may not be at risk,
- * but they are clearly considered "important".
- * <li>{@link #QUEUE_UNDER_REPLICATED}: blocks that are also under
- * replicated, and the ratio of actual:expected is good enough that
- * they do not need to go into the {@link #QUEUE_VERY_UNDER_REPLICATED}
- * queue.</li>
- * <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
- * many copies of a block as required, but the blocks are not adequately
- * distributed. Loss of a rack/switch could take all copies off-line.</li>
- * <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt
- * and for which there are no-non-corrupt copies (currently) available.
- * The policy here is to keep those corrupt blocks replicated, but give
- * blocks that are not corrupt higher priority.</li>
- * </ol>
- */
-class UnderReplicatedBlocks implements Iterable<BlockInfo> {
- /** The total number of queues : {@value} */
- static final int LEVEL = 5;
- /** The queue with the highest priority: {@value} */
- static final int QUEUE_HIGHEST_PRIORITY = 0;
- /** The queue for blocks that are way below their expected value : {@value} */
- static final int QUEUE_VERY_UNDER_REPLICATED = 1;
- /** The queue for "normally" under-replicated blocks: {@value} */
- static final int QUEUE_UNDER_REPLICATED = 2;
- /** The queue for blocks that have the right number of replicas,
- * but which the block manager felt were badly distributed: {@value}
- */
- static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
- /** The queue for corrupt blocks: {@value} */
- static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
- /** the queues themselves */
- private final List<LightWeightLinkedSet<BlockInfo>> priorityQueues
- = new ArrayList<>(LEVEL);
-
- /** The number of corrupt blocks with replication factor 1 */
- private int corruptReplOneBlocks = 0;
-
- /** Create an object. */
- UnderReplicatedBlocks() {
- for (int i = 0; i < LEVEL; i++) {
- priorityQueues.add(new LightWeightLinkedSet<BlockInfo>());
- }
- }
-
- /**
- * Empty the queues.
- */
- synchronized void clear() {
- for (int i = 0; i < LEVEL; i++) {
- priorityQueues.get(i).clear();
- }
- corruptReplOneBlocks = 0;
- }
-
- /** Return the total number of under replication blocks */
- synchronized int size() {
- int size = 0;
- for (int i = 0; i < LEVEL; i++) {
- size += priorityQueues.get(i).size();
- }
- return size;
- }
-
- /** Return the number of under replication blocks excluding corrupt blocks */
- synchronized int getUnderReplicatedBlockCount() {
- int size = 0;
- for (int i = 0; i < LEVEL; i++) {
- if (i != QUEUE_WITH_CORRUPT_BLOCKS) {
- size += priorityQueues.get(i).size();
- }
- }
- return size;
- }
-
- /** Return the number of corrupt blocks */
- synchronized int getCorruptBlockSize() {
- return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
- }
-
- /** Return the number of corrupt blocks with replication factor 1 */
- synchronized int getCorruptReplOneBlockSize() {
- return corruptReplOneBlocks;
- }
-
- /** Check if a block is in the neededReplication queue */
- synchronized boolean contains(BlockInfo block) {
- for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) {
- if (set.contains(block)) {
- return true;
- }
- }
- return false;
- }
-
- /** Return the priority of a block
- * @param curReplicas current number of replicas of the block
- * @param expectedReplicas expected number of replicas of the block
- * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
- */
- private int getPriority(BlockInfo block,
- int curReplicas,
- int readOnlyReplicas,
- int decommissionedReplicas,
- int expectedReplicas) {
- assert curReplicas >= 0 : "Negative replicas!";
- if (curReplicas >= expectedReplicas) {
- // Block has enough copies, but not enough racks
- return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
- }
- if (block.isStriped()) {
- BlockInfoStriped sblk = (BlockInfoStriped) block;
- return getPriorityStriped(curReplicas, decommissionedReplicas,
- sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
- } else {
- return getPriorityContiguous(curReplicas, readOnlyReplicas,
- decommissionedReplicas, expectedReplicas);
- }
- }
-
- private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
- int decommissionedReplicas, int expectedReplicas) {
- if (curReplicas == 0) {
- // If there are zero non-decommissioned replicas but there are
- // some decommissioned replicas, then assign them highest priority
- if (decommissionedReplicas > 0) {
- return QUEUE_HIGHEST_PRIORITY;
- }
- if (readOnlyReplicas > 0) {
- // only has read-only replicas, highest risk
- // since the read-only replicas may go down all together.
- return QUEUE_HIGHEST_PRIORITY;
- }
- //all we have are corrupt blocks
- return QUEUE_WITH_CORRUPT_BLOCKS;
- } else if (curReplicas == 1) {
- // only one replica, highest risk of loss
- // highest priority
- return QUEUE_HIGHEST_PRIORITY;
- } else if ((curReplicas * 3) < expectedReplicas) {
- //there is less than a third as many blocks as requested;
- //this is considered very under-replicated
- return QUEUE_VERY_UNDER_REPLICATED;
- } else {
- //add to the normal queue for under replicated blocks
- return QUEUE_UNDER_REPLICATED;
- }
- }
-
- private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
- short dataBlkNum, short parityBlkNum) {
- if (curReplicas < dataBlkNum) {
- // There are some replicas on decommissioned nodes so it's not corrupted
- if (curReplicas + decommissionedReplicas >= dataBlkNum) {
- return QUEUE_HIGHEST_PRIORITY;
- }
- return QUEUE_WITH_CORRUPT_BLOCKS;
- } else if (curReplicas == dataBlkNum) {
- // highest risk of loss, highest priority
- return QUEUE_HIGHEST_PRIORITY;
- } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
- // there is less than a third as many blocks as requested;
- // this is considered very under-replicated
- return QUEUE_VERY_UNDER_REPLICATED;
- } else {
- // add to the normal queue for under replicated blocks
- return QUEUE_UNDER_REPLICATED;
- }
- }
-
- /** add a block to a under replication queue according to its priority
- * @param block a under replication block
- * @param curReplicas current number of replicas of the block
- * @param decomissionedReplicas the number of decommissioned replicas
- * @param expectedReplicas expected number of replicas of the block
- * @return true if the block was added to a queue.
- */
- synchronized boolean add(BlockInfo block,
- int curReplicas,
- int readOnlyReplicas,
- int decomissionedReplicas,
- int expectedReplicas) {
- assert curReplicas >= 0 : "Negative replicas!";
- final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
- decomissionedReplicas, expectedReplicas);
- if(priorityQueues.get(priLevel).add(block)) {
- if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
- expectedReplicas == 1) {
- corruptReplOneBlocks++;
- }
- NameNode.blockStateChangeLog.debug(
- "BLOCK* NameSystem.UnderReplicationBlock.add: {}"
- + " has only {} replicas and need {} replicas so is added to" +
- " neededReplications at priority level {}", block, curReplicas,
- expectedReplicas, priLevel);
-
- return true;
- }
- return false;
- }
-
- /** remove a block from a under replication queue */
- synchronized boolean remove(BlockInfo block,
- int oldReplicas,
- int oldReadOnlyReplicas,
- int decommissionedReplicas,
- int oldExpectedReplicas) {
- final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
- decommissionedReplicas, oldExpectedReplicas);
- boolean removedBlock = remove(block, priLevel);
- if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
- oldExpectedReplicas == 1 &&
- removedBlock) {
- corruptReplOneBlocks--;
- assert corruptReplOneBlocks >= 0 :
- "Number of corrupt blocks with replication factor 1 " +
- "should be non-negative";
- }
- return removedBlock;
- }
-
- /**
- * Remove a block from the under replication queues.
- *
- * The priLevel parameter is a hint of which queue to query
- * first: if negative or >= {@link #LEVEL} this shortcutting
- * is not attmpted.
- *
- * If the block is not found in the nominated queue, an attempt is made to
- * remove it from all queues.
- *
- * <i>Warning:</i> This is not a synchronized method.
- * @param block block to remove
- * @param priLevel expected privilege level
- * @return true if the block was found and removed from one of the priority queues
- */
- boolean remove(BlockInfo block, int priLevel) {
- if(priLevel >= 0 && priLevel < LEVEL
- && priorityQueues.get(priLevel).remove(block)) {
- NameNode.blockStateChangeLog.debug(
- "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" +
- " from priority queue {}", block, priLevel);
- return true;
- } else {
- // Try to remove the block from all queues if the block was
- // not found in the queue for the given priority level.
- for (int i = 0; i < LEVEL; i++) {
- if (i != priLevel && priorityQueues.get(i).remove(block)) {
- NameNode.blockStateChangeLog.debug(
- "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" +
- " {} from priority queue {}", block, i);
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Recalculate and potentially update the priority level of a block.
- *
- * If the block priority has changed from before an attempt is made to
- * remove it from the block queue. Regardless of whether or not the block
- * is in the block queue of (recalculate) priority, an attempt is made
- * to add it to that queue. This ensures that the block will be
- * in its expected priority queue (and only that queue) by the end of the
- * method call.
- * @param block a under replicated block
- * @param curReplicas current number of replicas of the block
- * @param decommissionedReplicas the number of decommissioned replicas
- * @param curExpectedReplicas expected number of replicas of the block
- * @param curReplicasDelta the change in the replicate count from before
- * @param expectedReplicasDelta the change in the expected replica count from before
- */
- synchronized void update(BlockInfo block, int curReplicas,
- int readOnlyReplicas, int decommissionedReplicas,
- int curExpectedReplicas,
- int curReplicasDelta, int expectedReplicasDelta) {
- int oldReplicas = curReplicas-curReplicasDelta;
- int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
- int curPri = getPriority(block, curReplicas, readOnlyReplicas,
- decommissionedReplicas, curExpectedReplicas);
- int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
- decommissionedReplicas, oldExpectedReplicas);
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
- block +
- " curReplicas " + curReplicas +
- " curExpectedReplicas " + curExpectedReplicas +
- " oldReplicas " + oldReplicas +
- " oldExpectedReplicas " + oldExpectedReplicas +
- " curPri " + curPri +
- " oldPri " + oldPri);
- }
- if(oldPri != curPri) {
- remove(block, oldPri);
- }
- if(priorityQueues.get(curPri).add(block)) {
- NameNode.blockStateChangeLog.debug(
- "BLOCK* NameSystem.UnderReplicationBlock.update: {} has only {} " +
- "replicas and needs {} replicas so is added to " +
- "neededReplications at priority level {}", block, curReplicas,
- curExpectedReplicas, curPri);
-
- }
- if (oldPri != curPri || expectedReplicasDelta != 0) {
- // corruptReplOneBlocks could possibly change
- if (curPri == QUEUE_WITH_CORRUPT_BLOCKS &&
- curExpectedReplicas == 1) {
- // add a new corrupt block with replication factor 1
- corruptReplOneBlocks++;
- } else if (oldPri == QUEUE_WITH_CORRUPT_BLOCKS &&
- curExpectedReplicas - expectedReplicasDelta == 1) {
- // remove an existing corrupt block with replication factor 1
- corruptReplOneBlocks--;
- }
- }
- }
-
- /**
- * Get a list of block lists to be replicated. The index of block lists
- * represents its replication priority. Iterates each block list in priority
- * order beginning with the highest priority list. Iterators use a bookmark to
- * resume where the previous iteration stopped. Returns when the block count
- * is met or iteration reaches the end of the lowest priority list, in which
- * case bookmarks for each block list are reset to the heads of their
- * respective lists.
- *
- * @param blocksToProcess - number of blocks to fetch from underReplicated
- * blocks.
- * @return Return a list of block lists to be replicated. The block list index
- * represents its replication priority.
- */
- synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
- int blocksToProcess) {
- final List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
-
- int count = 0;
- int priority = 0;
- for (; count < blocksToProcess && priority < LEVEL; priority++) {
- if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
- // do not choose corrupted blocks.
- continue;
- }
-
- // Go through all blocks that need replications with current priority.
- // Set the iterator to the first unprocessed block at this priority level.
- final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
- final List<BlockInfo> blocks = new LinkedList<>();
- blocksToReplicate.add(blocks);
- // Loop through all remaining blocks in the list.
- for(; count < blocksToProcess && i.hasNext(); count++) {
- blocks.add(i.next());
- }
- }
-
- if (priority == LEVEL) {
- // Reset all bookmarks because there were no recently added blocks.
- for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
- q.resetBookmark();
- }
- }
-
- return blocksToReplicate;
- }
-
- /** returns an iterator of all blocks in a given priority queue */
- synchronized Iterator<BlockInfo> iterator(int level) {
- return priorityQueues.get(level).iterator();
- }
-
- /** return an iterator of all the under replication blocks */
- @Override
- public synchronized Iterator<BlockInfo> iterator() {
- final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator();
- return new Iterator<BlockInfo>() {
- private Iterator<BlockInfo> b = q.next().iterator();
-
- @Override
- public BlockInfo next() {
- hasNext();
- return b.next();
- }
-
- @Override
- public boolean hasNext() {
- for(; !b.hasNext() && q.hasNext(); ) {
- b = q.next().iterator();
- }
- return b.hasNext();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index c0a4fdb..1b565ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -74,7 +74,7 @@ public class BlockManagerTestUtil {
final BlockInfo storedBlock = bm.getStoredBlock(b);
return new int[]{getNumberOfRacks(bm, b),
bm.countNodes(storedBlock).liveReplicas(),
- bm.neededReplications.contains(storedBlock) ? 1 : 0};
+ bm.neededReconstruction.contains(storedBlock) ? 1 : 0};
} finally {
namesystem.readUnlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 5511b99..3a974e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -397,20 +397,20 @@ public class TestBlockManager {
addNodes(nodes);
List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);
for (int i = 0; i < NUM_TEST_ITERS; i++) {
- doTestSingleRackClusterIsSufficientlyReplicated(i, origNodes);
+ doTestSingleRackClusterHasSufficientRedundancy(i, origNodes);
}
}
- private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex,
+ private void doTestSingleRackClusterHasSufficientRedundancy(int testIndex,
List<DatanodeDescriptor> origNodes)
throws Exception {
assertEquals(0, bm.numOfUnderReplicatedBlocks());
BlockInfo block = addBlockOnNodes(testIndex, origNodes);
- assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block)));
+ assertFalse(bm.isNeededReconstruction(block, bm.countLiveNodes(block)));
}
@Test(timeout = 60000)
- public void testNeededReplicationWhileAppending() throws IOException {
+ public void testNeededReconstructionWhileAppending() throws IOException {
Configuration conf = new HdfsConfiguration();
String src = "/test-file";
Path file = new Path(src);
@@ -449,7 +449,7 @@ public class TestBlockManager {
namenode.updatePipeline(clientName, oldBlock, newBlock,
newLocatedBlock.getLocations(), newLocatedBlock.getStorageIDs());
BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock());
- assertFalse(bm.isNeededReplication(bi, bm.countLiveNodes(bi)));
+ assertFalse(bm.isNeededReconstruction(bi, bm.countLiveNodes(bi)));
} finally {
IOUtils.closeStream(out);
}
@@ -601,7 +601,7 @@ public class TestBlockManager {
liveNodes,
new NumberReplicas(),
new ArrayList<Byte>(),
- UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
+ LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
assertEquals("Does not choose a source node for a less-than-highest-priority"
+ " replication since all available source nodes have reached"
@@ -612,7 +612,7 @@ public class TestBlockManager {
liveNodes,
new NumberReplicas(),
new ArrayList<Byte>(),
- UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length);
+ LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY).length);
// Increase the replication count to test replication count > hard limit
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
@@ -626,7 +626,7 @@ public class TestBlockManager {
liveNodes,
new NumberReplicas(),
new ArrayList<Byte>(),
- UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length);
+ LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length);
}
@Test
@@ -652,7 +652,7 @@ public class TestBlockManager {
cntNodes,
liveNodes,
new NumberReplicas(), new LinkedList<Byte>(),
- UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]);
+ LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY)[0]);
// Increase the replication count to test replication count > hard limit
@@ -666,7 +666,7 @@ public class TestBlockManager {
cntNodes,
liveNodes,
new NumberReplicas(), new LinkedList<Byte>(),
- UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length);
+ LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY).length);
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
new file mode 100644
index 0000000..2eb7abf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class TestLowRedundancyBlockQueues {
+
+ private final ErasureCodingPolicy ecPolicy =
+ ErasureCodingPolicyManager.getSystemDefaultPolicy();
+
+ private BlockInfo genBlockInfo(long id) {
+ return new BlockInfoContiguous(new Block(id), (short) 3);
+ }
+
+ private BlockInfo genStripedBlockInfo(long id, long numBytes) {
+ BlockInfoStriped sblk = new BlockInfoStriped(new Block(id), ecPolicy);
+ sblk.setNumBytes(numBytes);
+ return sblk;
+ }
+
+ /**
+ * Test that adding blocks with different replication counts puts them
+ * into different queues
+ * @throws Throwable if something goes wrong
+ */
+ @Test
+ public void testBlockPriorities() throws Throwable {
+ LowRedundancyBlocks queues = new LowRedundancyBlocks();
+ BlockInfo block1 = genBlockInfo(1);
+ BlockInfo block2 = genBlockInfo(2);
+ BlockInfo block_very_low_redundancy = genBlockInfo(3);
+ BlockInfo block_corrupt = genBlockInfo(4);
+ BlockInfo block_corrupt_repl_one = genBlockInfo(5);
+
+ //add a block with a single entry
+ assertAdded(queues, block1, 1, 0, 3);
+
+ assertEquals(1, queues.getLowRedundancyBlockCount());
+ assertEquals(1, queues.size());
+ assertInLevel(queues, block1, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
+ //repeated additions fail
+ assertFalse(queues.add(block1, 1, 0, 0, 3));
+
+ //add a second block with two replicas
+ assertAdded(queues, block2, 2, 0, 3);
+ assertEquals(2, queues.getLowRedundancyBlockCount());
+ assertEquals(2, queues.size());
+ assertInLevel(queues, block2, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
+ //now try to add a block that is corrupt
+ assertAdded(queues, block_corrupt, 0, 0, 3);
+ assertEquals(3, queues.size());
+ assertEquals(2, queues.getLowRedundancyBlockCount());
+ assertEquals(1, queues.getCorruptBlockSize());
+ assertInLevel(queues, block_corrupt,
+ LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+
+ //insert a very insufficiently redundancy block
+ assertAdded(queues, block_very_low_redundancy, 4, 0, 25);
+ assertInLevel(queues, block_very_low_redundancy,
+ LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
+
+ //insert a corrupt block with replication factor 1
+ assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
+ assertEquals(2, queues.getCorruptBlockSize());
+ assertEquals(1, queues.getCorruptReplOneBlockSize());
+ queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
+ assertEquals(0, queues.getCorruptReplOneBlockSize());
+ queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
+ assertEquals(1, queues.getCorruptReplOneBlockSize());
+ queues.update(block_very_low_redundancy, 0, 0, 0, 1, -4, -24);
+ assertEquals(2, queues.getCorruptReplOneBlockSize());
+ }
+
+ @Test
+ public void testStripedBlockPriorities() throws Throwable {
+ int dataBlkNum = ecPolicy.getNumDataUnits();
+ int parityBlkNUm = ecPolicy.getNumParityUnits();
+ doTestStripedBlockPriorities(1, parityBlkNUm);
+ doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm);
+ }
+
+ private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum)
+ throws Throwable {
+ int groupSize = dataBlkNum + parityBlkNum;
+ long numBytes = ecPolicy.getCellSize() * dataBlkNum;
+ LowRedundancyBlocks queues = new LowRedundancyBlocks();
+ int numUR = 0;
+ int numCorrupt = 0;
+
+ // add low redundancy blocks
+ for (int i = 0; dataBlkNum + i < groupSize; i++) {
+ BlockInfo block = genStripedBlockInfo(-100 - 100 * i, numBytes);
+ assertAdded(queues, block, dataBlkNum + i, 0, groupSize);
+ numUR++;
+ assertEquals(numUR, queues.getLowRedundancyBlockCount());
+ assertEquals(numUR + numCorrupt, queues.size());
+ if (i == 0) {
+ assertInLevel(queues, block,
+ LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
+ } else if (i * 3 < parityBlkNum + 1) {
+ assertInLevel(queues, block,
+ LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
+ } else {
+ assertInLevel(queues, block,
+ LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
+ }
+ }
+
+ // add a corrupted block
+ BlockInfo block_corrupt = genStripedBlockInfo(-10, numBytes);
+ assertEquals(numCorrupt, queues.getCorruptBlockSize());
+ assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
+ numCorrupt++;
+ assertEquals(numUR + numCorrupt, queues.size());
+ assertEquals(numUR, queues.getLowRedundancyBlockCount());
+ assertEquals(numCorrupt, queues.getCorruptBlockSize());
+ assertInLevel(queues, block_corrupt,
+ LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+ }
+
+ private void assertAdded(LowRedundancyBlocks queues,
+ BlockInfo block,
+ int curReplicas,
+ int decomissionedReplicas,
+ int expectedReplicas) {
+ assertTrue("Failed to add " + block,
+ queues.add(block,
+ curReplicas, 0,
+ decomissionedReplicas,
+ expectedReplicas));
+ }
+
+ /**
+ * Determine whether or not a block is in a level without changing the API.
+ * Instead get the per-level iterator and run though it looking for a match.
+ * If the block is not found, an assertion is thrown.
+ *
+ * This is inefficient, but this is only a test case.
+ * @param queues queues to scan
+ * @param block block to look for
+ * @param level level to select
+ */
+ private void assertInLevel(LowRedundancyBlocks queues,
+ Block block,
+ int level) {
+ final Iterator<BlockInfo> bi = queues.iterator(level);
+ while (bi.hasNext()) {
+ Block next = bi.next();
+ if (block.equals(next)) {
+ return;
+ }
+ }
+ fail("Block " + block + " not found in level " + level);
+ }
+}