You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/04/02 20:10:24 UTC
[46/50] [abbrv] hadoop git commit: HDFS-7907. Erasure Coding: track
invalid, corrupt,
and under-recovery striped blocks in NameNode. Contributed by Jing Zhao.
HDFS-7907. Erasure Coding: track invalid, corrupt, and under-recovery striped blocks in NameNode. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/019d211a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/019d211a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/019d211a
Branch: refs/heads/HDFS-7285
Commit: 019d211abc309a41c7fd50ff01cb50e3aa84ed68
Parents: c48643f
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Mar 30 11:25:09 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Thu Apr 2 11:06:11 2015 -0700
----------------------------------------------------------------------
.../blockmanagement/BlockInfoStriped.java | 25 ++-
.../server/blockmanagement/BlockManager.java | 203 ++++++++++---------
.../blockmanagement/DecommissionManager.java | 86 ++++----
.../hdfs/server/namenode/FSNamesystem.java | 8 +-
.../server/blockmanagement/TestNodeCount.java | 2 +-
.../TestOverReplicatedBlocks.java | 4 +-
6 files changed, 172 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/019d211a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 30b5ee7..4a85efb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -18,11 +18,13 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+
import java.io.DataOutput;
import java.io.IOException;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
*
@@ -37,7 +39,6 @@ import java.io.IOException;
* array to record the block index for each triplet.
*/
public class BlockInfoStriped extends BlockInfo {
- private final int chunkSize = HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
private final short dataBlockNum;
private final short parityBlockNum;
/**
@@ -132,6 +133,22 @@ public class BlockInfoStriped extends BlockInfo {
return i == -1 ? -1 : indices[i];
}
+ /**
+ * Identify the block stored in the given datanode storage. Note that
+ * the returned block has the same block Id with the one seen/reported by the
+ * DataNode.
+ */
+ Block getBlockOnStorage(DatanodeStorageInfo storage) {
+ int index = getStorageBlockIndex(storage);
+ if (index < 0) {
+ return null;
+ } else {
+ Block block = new Block(this);
+ block.setBlockId(this.getBlockId() + index);
+ return block;
+ }
+ }
+
@Override
boolean removeStorage(DatanodeStorageInfo storage) {
int dnIndex = findStorageInfoFromEnd(storage);
@@ -186,8 +203,8 @@ public class BlockInfoStriped extends BlockInfo {
// In case striped blocks, total usage by this striped blocks should
// be the total of data blocks and parity blocks because
// `getNumBytes` is the total of actual data block size.
- return ((getNumBytes() - 1) / (dataBlockNum * chunkSize) + 1)
- * chunkSize * parityBlockNum + getNumBytes();
+ return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CHUNK_SIZE) + 1)
+ * BLOCK_STRIPED_CHUNK_SIZE * parityBlockNum + getNumBytes();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/019d211a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7de352b..5aed4d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -177,7 +177,11 @@ public class BlockManager {
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
- /** Blocks to be invalidated. */
+ /**
+ * Blocks to be invalidated.
+ * For a striped block to invalidate, we should track its individual internal
+ * blocks.
+ */
private final InvalidateBlocks invalidateBlocks;
/**
@@ -193,8 +197,8 @@ public class BlockManager {
* Maps a StorageID to the set of blocks that are "extra" for this
* DataNode. We'll eventually remove these extras.
*/
- public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
- new TreeMap<String, LightWeightLinkedSet<Block>>();
+ public final Map<String, LightWeightLinkedSet<BlockInfo>> excessReplicateMap =
+ new TreeMap<>();
/**
* Store set of Blocks that need to be replicated 1 or more times.
@@ -591,11 +595,11 @@ public class BlockManager {
((BlockInfoStriped) block).getDataBlockNum() : minReplication;
}
- public boolean checkMinStorage(BlockInfo block) {
+ public boolean hasMinStorage(BlockInfo block) {
return countNodes(block).liveReplicas() >= getMinStorageNum(block);
}
- public boolean checkMinStorage(BlockInfo block, int liveNum) {
+ public boolean hasMinStorage(BlockInfo block, int liveNum) {
return liveNum >= getMinStorageNum(block);
}
@@ -640,7 +644,7 @@ public class BlockManager {
return false; // already completed (e.g. by syncBlock)
final boolean b = commitBlock(lastBlock, commitBlock);
- if (checkMinStorage(lastBlock)) {
+ if (hasMinStorage(lastBlock)) {
completeBlock(bc, bc.numBlocks() - 1, false);
}
return b;
@@ -664,7 +668,7 @@ public class BlockManager {
}
int numNodes = curBlock.numNodes();
- if (!force && !checkMinStorage(curBlock, numNodes)) {
+ if (!force && !hasMinStorage(curBlock, numNodes)) {
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
}
@@ -762,7 +766,7 @@ public class BlockManager {
// count in safe-mode.
namesystem.adjustSafeModeBlockTotals(
// decrement safe if we had enough
- checkMinStorage(oldBlock, targets.length) ? -1 : 0,
+ hasMinStorage(oldBlock, targets.length) ? -1 : 0,
// always decrement total blocks
-1);
@@ -1090,7 +1094,7 @@ public class BlockManager {
/** Remove the blocks associated to the given datanode. */
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
- final Iterator<? extends Block> it = node.getBlockIterator();
+ final Iterator<BlockInfo> it = node.getBlockIterator();
while(it.hasNext()) {
removeStoredBlock(it.next(), node);
}
@@ -1104,10 +1108,10 @@ public class BlockManager {
/** Remove the blocks associated to the given DatanodeStorageInfo. */
void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
assert namesystem.hasWriteLock();
- final Iterator<? extends Block> it = storageInfo.getBlockIterator();
+ final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
while(it.hasNext()) {
- Block block = it.next();
+ BlockInfo block = it.next();
removeStoredBlock(block, node);
invalidateBlocks.remove(node, block);
}
@@ -1129,21 +1133,32 @@ public class BlockManager {
* Adds block to list of blocks which will be invalidated on all its
* datanodes.
*/
- private void addToInvalidates(Block b) {
+ private void addToInvalidates(BlockInfo storedBlock) {
if (!namesystem.isPopulatingReplQueues()) {
return;
}
StringBuilder datanodes = new StringBuilder();
- for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock,
+ State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
- invalidateBlocks.add(b, node, false);
- datanodes.append(node).append(" ");
+ final Block b = getBlockToInvalidate(storedBlock, storage);
+ if (b != null) {
+ invalidateBlocks.add(b, node, false);
+ datanodes.append(node).append(" ");
+ }
}
if (datanodes.length() != 0) {
- blockLog.info("BLOCK* addToInvalidates: {} {}", b, datanodes.toString());
+ blockLog.info("BLOCK* addToInvalidates: {} {}", storedBlock,
+ datanodes.toString());
}
}
+ private Block getBlockToInvalidate(BlockInfo storedBlock,
+ DatanodeStorageInfo storage) {
+ return storedBlock.isStriped() ?
+ ((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : storedBlock;
+ }
+
/**
* Remove all block invalidation tasks under this datanode UUID;
* used when a datanode registers with a new UUID and the old one
@@ -1201,7 +1216,7 @@ public class BlockManager {
DatanodeStorageInfo storageInfo,
DatanodeDescriptor node) throws IOException {
- BlockCollection bc = b.corrupted.getBlockCollection();
+ BlockCollection bc = b.stored.getBlockCollection();
if (bc == null) {
blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
" corrupt as it does not belong to any file", b);
@@ -1211,7 +1226,7 @@ public class BlockManager {
// Add replica to the data-node if it is not already there
if (storageInfo != null) {
- storageInfo.addBlock(b.stored, b.reportedBlock);
+ storageInfo.addBlock(b.stored, b.corrupted);
}
// Add this replica to corruptReplicas Map
@@ -1221,8 +1236,7 @@ public class BlockManager {
NumberReplicas numberOfReplicas = countNodes(b.stored);
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= bc
.getBlockReplication();
- boolean minReplicationSatisfied = checkMinStorage(b.stored,
- numberOfReplicas.liveReplicas());
+ boolean minReplicationSatisfied = hasMinStorage(b.stored, numberOfReplicas.liveReplicas());
boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
bc.getBlockReplication();
@@ -1414,7 +1428,7 @@ public class BlockManager {
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block)) ) {
+ (blockHasEnoughRacks(block, requiredReplication)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
blockLog.info("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
@@ -1497,7 +1511,7 @@ public class BlockManager {
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block)) ) {
+ (blockHasEnoughRacks(block, requiredReplication)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
blockLog.info("BLOCK* Removing {} from neededReplications as" +
@@ -1507,7 +1521,7 @@ public class BlockManager {
}
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
- (!blockHasEnoughRacks(block)) ) {
+ (!blockHasEnoughRacks(block, requiredReplication)) ) {
if (rw.srcNodes[0].getNetworkLocation().equals(
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
//No use continuing, unless a new rack in this case
@@ -1700,7 +1714,7 @@ public class BlockManager {
getStorageBlockIndex(storage));
}
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
- LightWeightLinkedSet<Block> excessBlocks =
+ LightWeightLinkedSet<BlockInfo> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid());
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
@@ -1829,39 +1843,32 @@ public class BlockManager {
* list of blocks that should be considered corrupt due to a block report.
*/
private static class BlockToMarkCorrupt {
- /** The corrupted block in a datanode. */
- final BlockInfo corrupted;
+ /**
+ * The corrupted block in a datanode. This is the one reported by the
+ * datanode.
+ */
+ final Block corrupted;
/** The corresponding block stored in the BlockManager. */
final BlockInfo stored;
- /** The block reported from a datanode */
- final Block reportedBlock;
/** The reason to mark corrupt. */
final String reason;
/** The reason code to be stored */
final Reason reasonCode;
- BlockToMarkCorrupt(Block reported, BlockInfo corrupted,
- BlockInfo stored, String reason, Reason reasonCode) {
- Preconditions.checkNotNull(reported, "reported is null");
+ BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason,
+ Reason reasonCode) {
Preconditions.checkNotNull(corrupted, "corrupted is null");
Preconditions.checkNotNull(stored, "stored is null");
- this.reportedBlock = reported;
this.corrupted = corrupted;
this.stored = stored;
this.reason = reason;
this.reasonCode = reasonCode;
}
- BlockToMarkCorrupt(Block reported, BlockInfo stored, String reason,
- Reason reasonCode) {
- this(reported, stored, stored, reason, reasonCode);
- }
-
- BlockToMarkCorrupt(Block reported, BlockInfo stored, long gs,
+ BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs,
String reason, Reason reasonCode) {
- this(reported, BlockInfo.copyOf(stored), stored, reason,
- reasonCode);
+ this(corrupted, stored, reason, reasonCode);
//the corrupted block in datanode has a different generation stamp
corrupted.setGenerationStamp(gs);
}
@@ -2077,10 +2084,10 @@ public class BlockManager {
// between the old and new block report.
//
Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
- Collection<Block> toRemove = new TreeSet<Block>();
- Collection<Block> toInvalidate = new LinkedList<Block>();
- Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
- Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+ Collection<BlockInfo> toRemove = new TreeSet<>();
+ Collection<Block> toInvalidate = new LinkedList<>();
+ Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
+ Collection<StatefulBlockInfo> toUC = new LinkedList<>();
reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
@@ -2089,7 +2096,7 @@ public class BlockManager {
for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b, storageInfo);
}
- for (Block b : toRemove) {
+ for (BlockInfo b : toRemove) {
removeStoredBlock(b, node);
}
int numBlocksLogged = 0;
@@ -2224,7 +2231,7 @@ public class BlockManager {
private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport,
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
- Collection<Block> toRemove, // remove from DatanodeDescriptor
+ Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
@@ -2259,8 +2266,9 @@ public class BlockManager {
// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0));
- while(it.hasNext())
+ while (it.hasNext()) {
toRemove.add(it.next());
+ }
storageInfo.removeBlock(delimiter);
}
@@ -2584,7 +2592,7 @@ public class BlockManager {
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
- && checkMinStorage(storedBlock, numCurrentReplica)) {
+ && hasMinStorage(storedBlock, numCurrentReplica)) {
completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
} else if (storedBlock.isComplete()) {
// check whether safe replication is reached for the block
@@ -2659,7 +2667,7 @@ public class BlockManager {
+ pendingReplications.getNumReplicas(storedBlock);
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
- checkMinStorage(storedBlock, numLiveReplicas)) {
+ hasMinStorage(storedBlock, numLiveReplicas)) {
storedBlock = completeBlock(bc, storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
@@ -2697,7 +2705,7 @@ public class BlockManager {
int numCorruptNodes = num.corruptReplicas();
if (numCorruptNodes != corruptReplicasCount) {
LOG.warn("Inconsistent number of corrupt replicas for " +
- storedBlock + "blockMap has " + numCorruptNodes +
+ storedBlock + ". blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
@@ -2971,14 +2979,14 @@ public class BlockManager {
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
*/
- private void processOverReplicatedBlock(final Block block,
+ private void processOverReplicatedBlock(final BlockInfo block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
assert namesystem.hasWriteLock();
if (addedNode == delNodeHint) {
delNodeHint = null;
}
- Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
+ Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
@@ -2992,8 +3000,8 @@ public class BlockManager {
postponeBlock(block);
return;
}
- LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
- .getDatanodeUuid());
+ LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+ cur.getDatanodeUuid());
if (excessBlocks == null || !excessBlocks.contains(block)) {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
// exclude corrupt replicas
@@ -3023,22 +3031,22 @@ public class BlockManager {
* then pick a node with least free space
*/
private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,
- Block b, short replication,
+ BlockInfo storedBlock, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
BlockPlacementPolicy replicator) {
assert namesystem.hasWriteLock();
// first form a rack to datanodes map and
- BlockCollection bc = getBlockCollection(b);
- final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
+ BlockCollection bc = getBlockCollection(storedBlock);
+ final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
+ bc.getStoragePolicyID());
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
- final Map<String, List<DatanodeStorageInfo>> rackMap
- = new HashMap<String, List<DatanodeStorageInfo>>();
- final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
- final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
+ final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
+ final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
+ final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
// split nodes into two sets
// moreThanOne contains nodes on rack with more than one replica
@@ -3059,7 +3067,7 @@ public class BlockManager {
moreThanOne, excessTypes)) {
cur = delNodeHintStorage;
} else { // regular excessive replica removal
- cur = replicator.chooseReplicaToDelete(bc, b, replication,
+ cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication,
moreThanOne, exactlyOne, excessTypes);
}
firstOne = false;
@@ -3069,7 +3077,7 @@ public class BlockManager {
exactlyOne, cur);
nonExcess.remove(cur);
- addToExcessReplicate(cur.getDatanodeDescriptor(), b);
+ addToExcessReplicate(cur.getDatanodeDescriptor(), storedBlock);
//
// The 'excessblocks' tracks blocks until we get confirmation
@@ -3078,11 +3086,12 @@ public class BlockManager {
//
// The 'invalidate' list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
- // upon giving instructions to the namenode.
+ // upon giving instructions to the datanodes.
//
- addToInvalidates(b, cur.getDatanodeDescriptor());
+ final Block blockToInvalidate = getBlockToInvalidate(storedBlock, cur);
+ addToInvalidates(blockToInvalidate, cur.getDatanodeDescriptor());
blockLog.info("BLOCK* chooseExcessReplicates: "
- +"({}, {}) is added to invalidated blocks set", cur, b);
+ +"({}, {}) is added to invalidated blocks set", cur, storedBlock);
}
}
@@ -3107,17 +3116,18 @@ public class BlockManager {
}
}
- private void addToExcessReplicate(DatanodeInfo dn, Block block) {
+ private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
assert namesystem.hasWriteLock();
- LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
+ LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+ dn.getDatanodeUuid());
if (excessBlocks == null) {
- excessBlocks = new LightWeightLinkedSet<Block>();
+ excessBlocks = new LightWeightLinkedSet<>();
excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
}
- if (excessBlocks.add(block)) {
+ if (excessBlocks.add(storedBlock)) {
excessBlocksCount.incrementAndGet();
blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to"
- + " excessReplicateMap", dn, block);
+ + " excessReplicateMap", dn, storedBlock);
}
}
@@ -3125,14 +3135,13 @@ public class BlockManager {
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
* removed block is still valid.
*/
- public void removeStoredBlock(Block block, DatanodeDescriptor node) {
- blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
+ public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
+ blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
assert (namesystem.hasWriteLock());
{
- BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
- " removed from node {}", block, node);
+ " removed from node {}", storedBlock, node);
return;
}
@@ -3142,7 +3151,7 @@ public class BlockManager {
// necessary. In that case, put block on a possibly-will-
// be-replicated list.
//
- BlockCollection bc = blocksMap.getBlockCollection(block);
+ BlockCollection bc = storedBlock.getBlockCollection();
if (bc != null) {
namesystem.decrementSafeBlockCount(storedBlock);
updateNeededReplications(storedBlock, -1, 0);
@@ -3152,13 +3161,13 @@ public class BlockManager {
// We've removed a block from a node, so it's definitely no longer
// in "excess" there.
//
- LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
- .getDatanodeUuid());
+ LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+ node.getDatanodeUuid());
if (excessBlocks != null) {
- if (excessBlocks.remove(block)) {
+ if (excessBlocks.remove(storedBlock)) {
excessBlocksCount.decrementAndGet();
blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " +
- "excessBlocks", block);
+ "excessBlocks", storedBlock);
if (excessBlocks.size() == 0) {
excessReplicateMap.remove(node.getDatanodeUuid());
}
@@ -3166,7 +3175,7 @@ public class BlockManager {
}
// Remove the replica from corruptReplicas
- corruptReplicas.removeFromCorruptReplicasMap(block, node);
+ corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node);
}
}
@@ -3300,7 +3309,7 @@ public class BlockManager {
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
switch (rdbi.getStatus()) {
case DELETED_BLOCK:
- removeStoredBlock(rdbi.getBlock(), node);
+ removeStoredBlock(getStoredBlock(rdbi.getBlock()), node);
deleted++;
break;
case RECEIVED_BLOCK:
@@ -3348,8 +3357,8 @@ public class BlockManager {
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
decommissioned++;
} else {
- LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
- .getDatanodeUuid());
+ LightWeightLinkedSet<BlockInfo> blocksExcess = excessReplicateMap.get(
+ node.getDatanodeUuid());
if (blocksExcess != null && blocksExcess.contains(b)) {
excess++;
} else {
@@ -3402,13 +3411,13 @@ public class BlockManager {
int numOverReplicated = 0;
while(it.hasNext()) {
final BlockInfo block = it.next();
- BlockCollection bc = blocksMap.getBlockCollection(block);
- short expectedReplication = bc.getBlockReplication();
+ int expectedReplication = this.getReplication(block);
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
if (numCurrentReplica > expectedReplication) {
// over-replicated block
- processOverReplicatedBlock(block, expectedReplication, null, null);
+ processOverReplicatedBlock(block, (short) expectedReplication, null,
+ null);
numOverReplicated++;
}
}
@@ -3582,21 +3591,20 @@ public class BlockManager {
return toInvalidate.size();
}
- boolean blockHasEnoughRacks(Block b) {
+ // TODO: update the enough rack logic for striped blocks
+ boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) {
if (!this.shouldCheckForEnoughRacks) {
return true;
}
boolean enoughRacks = false;
- Collection<DatanodeDescriptor> corruptNodes =
- corruptReplicas.getNodes(b);
- int numExpectedReplicas = getReplication(b);
+ Collection<DatanodeDescriptor> corruptNodes =
+ corruptReplicas.getNodes(storedBlock);
String rackName = null;
- for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
- if (numExpectedReplicas == 1 ||
- (numExpectedReplicas > 1 &&
+ if (expectedStorageNum == 1 || (expectedStorageNum > 1 &&
!datanodeManager.hasClusterEverBeenMultiRack())) {
enoughRacks = true;
break;
@@ -3618,8 +3626,8 @@ public class BlockManager {
* A block needs replication if the number of replicas is less than expected
* or if it does not have enough racks.
*/
- boolean isNeededReplication(Block b, int expected, int current) {
- return current < expected || !blockHasEnoughRacks(b);
+ boolean isNeededReplication(BlockInfo storedBlock, int expected, int current) {
+ return current < expected || !blockHasEnoughRacks(storedBlock, expected);
}
public long getMissingBlocksCount() {
@@ -3791,8 +3799,7 @@ public class BlockManager {
/**
* This class is used internally by {@link this#computeRecoveryWorkForBlocks}
* to represent a task to recover a block through replication or erasure
- * coding. Recovery is done by transferring data from {@link srcNodes} to
- * {@link targets}
+ * coding. Recovery is done by transferring data from srcNodes to targets
*/
private static class BlockRecoveryWork {
protected final BlockInfo block;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/019d211a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index df31d6e..ddce568 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -100,7 +100,7 @@ public class DecommissionManager {
* reports or other events. Before being finally marking as decommissioned,
* another check is done with the actual block map.
*/
- private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>
+ private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfo>>
decomNodeBlocks;
/**
@@ -246,12 +246,12 @@ public class DecommissionManager {
}
/**
- * Checks whether a block is sufficiently replicated for decommissioning.
- * Full-strength replication is not always necessary, hence "sufficient".
+ * Checks whether a block is sufficiently replicated/stored for
+ * decommissioning. For replicated blocks or striped blocks, full-strength
+ * replication or storage is not always necessary, hence "sufficient".
* @return true if sufficient, else false.
*/
- private boolean isSufficientlyReplicated(BlockInfoContiguous block,
- BlockCollection bc,
+ private boolean isSufficient(BlockInfo block, BlockCollection bc,
NumberReplicas numberReplicas) {
final int numExpected = bc.getBlockReplication();
final int numLive = numberReplicas.liveReplicas();
@@ -267,18 +267,19 @@ public class DecommissionManager {
if (numExpected > numLive) {
if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
// Can decom a UC block as long as there will still be minReplicas
- if (numLive >= blockManager.minReplication) {
+ if (blockManager.hasMinStorage(block, numLive)) {
LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
- + ">= minR ({})", block, numLive, blockManager.minReplication);
+ + ">= minR ({})", block, numLive,
+ blockManager.getMinStorageNum(block));
return true;
} else {
LOG.trace("UC block {} insufficiently-replicated since numLive "
+ "({}) < minR ({})", block, numLive,
- blockManager.minReplication);
+ blockManager.getMinStorageNum(block));
}
} else {
// Can decom a non-UC as long as the default replication is met
- if (numLive >= blockManager.defaultReplication) {
+ if (numLive >= blockManager.getDefaultStorageNum(block)) {
return true;
}
}
@@ -413,7 +414,7 @@ public class DecommissionManager {
}
private void check() {
- final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>>
+ final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator();
final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
@@ -421,10 +422,10 @@ public class DecommissionManager {
&& !exceededNumBlocksPerCheck()
&& !exceededNumNodesPerCheck()) {
numNodesChecked++;
- final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>
+ final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
entry = it.next();
final DatanodeDescriptor dn = entry.getKey();
- AbstractList<BlockInfoContiguous> blocks = entry.getValue();
+ AbstractList<BlockInfo> blocks = entry.getValue();
boolean fullScan = false;
if (blocks == null) {
// This is a newly added datanode, run through its list to schedule
@@ -432,14 +433,14 @@ public class DecommissionManager {
// that are insufficiently replicated for further tracking
LOG.debug("Newly-added node {}, doing full scan to find " +
"insufficiently-replicated blocks.", dn);
- blocks = handleInsufficientlyReplicated(dn);
+ blocks = handleInsufficientlyStored(dn);
decomNodeBlocks.put(dn, blocks);
fullScan = true;
} else {
// This is a known datanode, check if its # of insufficiently
// replicated blocks has dropped to zero and if it can be decommed
LOG.debug("Processing decommission-in-progress node {}", dn);
- pruneSufficientlyReplicated(dn, blocks);
+ pruneReliableBlocks(dn, blocks);
}
if (blocks.size() == 0) {
if (!fullScan) {
@@ -451,7 +452,7 @@ public class DecommissionManager {
// marking the datanode as decommissioned
LOG.debug("Node {} has finished replicating current set of "
+ "blocks, checking with the full block map.", dn);
- blocks = handleInsufficientlyReplicated(dn);
+ blocks = handleInsufficientlyStored(dn);
decomNodeBlocks.put(dn, blocks);
}
// If the full scan is clean AND the node liveness is okay,
@@ -492,27 +493,25 @@ public class DecommissionManager {
}
/**
- * Removes sufficiently replicated blocks from the block list of a
- * datanode.
+ * Removes reliable blocks from the block list of a datanode.
*/
- private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode,
- AbstractList<BlockInfoContiguous> blocks) {
+ private void pruneReliableBlocks(final DatanodeDescriptor datanode,
+ AbstractList<BlockInfo> blocks) {
processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
}
/**
- * Returns a list of blocks on a datanode that are insufficiently
- * replicated, i.e. are under-replicated enough to prevent decommission.
+ * Returns a list of blocks on a datanode that are insufficiently replicated
+ * or require recovery, i.e. requiring recovery and should prevent
+ * decommission.
* <p/>
- * As part of this, it also schedules replication work for
- * any under-replicated blocks.
+ * As part of this, it also schedules replication/recovery work.
*
- * @param datanode
- * @return List of insufficiently replicated blocks
+ * @return List of blocks requiring recovery
*/
- private AbstractList<BlockInfoContiguous> handleInsufficientlyReplicated(
+ private AbstractList<BlockInfo> handleInsufficientlyStored(
final DatanodeDescriptor datanode) {
- AbstractList<BlockInfoContiguous> insufficient = new ChunkedArrayList<>();
+ AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
processBlocksForDecomInternal(datanode, datanode.getBlockIterator(),
insufficient, false);
return insufficient;
@@ -521,24 +520,22 @@ public class DecommissionManager {
/**
* Used while checking if decommission-in-progress datanodes can be marked
* as decommissioned. Combines shared logic of
- * pruneSufficientlyReplicated and handleInsufficientlyReplicated.
+ * pruneReliableBlocks and handleInsufficientlyStored.
*
* @param datanode Datanode
* @param it Iterator over the blocks on the
* datanode
- * @param insufficientlyReplicated Return parameter. If it's not null,
+ * @param insufficientList Return parameter. If it's not null,
* will contain the insufficiently
* replicated-blocks from the list.
- * @param pruneSufficientlyReplicated whether to remove sufficiently
- * replicated blocks from the iterator
- * @return true if there are under-replicated blocks in the provided block
- * iterator, else false.
+ * @param pruneReliableBlocks whether to remove blocks reliable
+ * enough from the iterator
*/
private void processBlocksForDecomInternal(
final DatanodeDescriptor datanode,
- final Iterator<? extends BlockInfo> it,
- final List<BlockInfoContiguous> insufficientlyReplicated,
- boolean pruneSufficientlyReplicated) {
+ final Iterator<BlockInfo> it,
+ final List<BlockInfo> insufficientList,
+ boolean pruneReliableBlocks) {
boolean firstReplicationLog = true;
int underReplicatedBlocks = 0;
int decommissionOnlyReplicas = 0;
@@ -553,7 +550,7 @@ public class DecommissionManager {
it.remove();
continue;
}
- BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
+ BlockCollection bc = blockManager.getBlockCollection(block);
if (bc == null) {
// Orphan block, will be invalidated eventually. Skip.
continue;
@@ -561,7 +558,6 @@ public class DecommissionManager {
final NumberReplicas num = blockManager.countNodes(block);
final int liveReplicas = num.liveReplicas();
- final int curReplicas = liveReplicas;
// Schedule under-replicated blocks for replication if not already
// pending
@@ -571,8 +567,7 @@ public class DecommissionManager {
blockManager.pendingReplications.getNumReplicas(block) == 0 &&
namesystem.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
- blockManager.neededReplications.add(block,
- curReplicas,
+ blockManager.neededReplications.add(block, liveReplicas,
num.decommissionedReplicas(),
bc.getBlockReplication());
}
@@ -580,17 +575,16 @@ public class DecommissionManager {
// Even if the block is under-replicated,
// it doesn't block decommission if it's sufficiently replicated
- BlockInfoContiguous blk = (BlockInfoContiguous) block;
- if (isSufficientlyReplicated(blk, bc, num)) {
- if (pruneSufficientlyReplicated) {
+ if (isSufficient(block, bc, num)) {
+ if (pruneReliableBlocks) {
it.remove();
}
continue;
}
// We've found an insufficiently replicated block.
- if (insufficientlyReplicated != null) {
- insufficientlyReplicated.add(blk);
+ if (insufficientList != null) {
+ insufficientList.add(block);
}
// Log if this is our first time through
if (firstReplicationLog) {
@@ -603,7 +597,7 @@ public class DecommissionManager {
if (bc.isUnderConstruction()) {
underReplicatedInOpenFiles++;
}
- if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
+ if ((liveReplicas == 0) && (num.decommissionedReplicas() > 0)) {
decommissionOnlyReplicas++;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/019d211a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 75d399a..fd36a07 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3810,7 +3810,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (trackBlockCounts) {
if (b.isComplete()) {
numRemovedComplete++;
- if (blockManager.checkMinStorage(b, b.numNodes())) {
+ if (blockManager.hasMinStorage(b, b.numNodes())) {
numRemovedSafe++;
}
}
@@ -4039,7 +4039,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
curBlock = blocks[nrCompleteBlocks];
if(!curBlock.isComplete())
break;
- assert blockManager.checkMinStorage(curBlock) :
+ assert blockManager.hasMinStorage(curBlock) :
"A COMPLETE block is not minimally replicated in " + src;
}
@@ -4075,7 +4075,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// If penultimate block doesn't exist then its minReplication is met
boolean penultimateBlockMinStorage = penultimateBlock == null ||
- blockManager.checkMinStorage(penultimateBlock);
+ blockManager.hasMinStorage(penultimateBlock);
switch(lastBlockState) {
case COMPLETE:
@@ -4084,7 +4084,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
case COMMITTED:
// Close file if committed blocks are minimally replicated
if(penultimateBlockMinStorage &&
- blockManager.checkMinStorage(lastBlock)) {
+ blockManager.hasMinStorage(lastBlock)) {
finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId());
NameNode.stateChangeLog.warn("BLOCK*"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/019d211a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
index 1c3f075..c33667d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
@@ -100,7 +100,7 @@ public class TestNodeCount {
DatanodeDescriptor nonExcessDN = null;
for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
- Collection<Block> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
+ Collection<BlockInfo> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
nonExcessDN = dn;
break;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/019d211a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
index 2d7bb44..83b3aa0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -42,7 +41,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.util.Time;
import org.junit.Test;
public class TestOverReplicatedBlocks {
@@ -185,7 +183,7 @@ public class TestOverReplicatedBlocks {
// All replicas for deletion should be scheduled on lastDN.
// And should not actually be deleted, because lastDN does not heartbeat.
namesystem.readLock();
- Collection<Block> dnBlocks =
+ Collection<BlockInfo> dnBlocks =
namesystem.getBlockManager().excessReplicateMap.get(lastDNid);
assertEquals("Replicas on node " + lastDNid + " should have been deleted",
SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());