You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/06/30 01:30:52 UTC
[36/50] [abbrv] hadoop git commit: HDFS-8623. Refactor NameNode
handling of invalid, corrupt,
and under-recovery blocks. Contributed by Zhe Zhang.
HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery blocks. Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b9dec68e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b9dec68e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b9dec68e
Branch: refs/heads/YARN-2928
Commit: b9dec68e0e7a69dbf8c556edcd956c640c12b008
Parents: 9c96ce7
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Jun 26 10:49:01 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Jun 29 10:28:27 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/blockmanagement/BlockInfo.java | 12 +-
.../blockmanagement/BlockInfoContiguous.java | 2 +-
.../BlockInfoUnderConstructionContiguous.java | 2 +-
.../server/blockmanagement/BlockManager.java | 595 ++++++++++---------
.../blockmanagement/DatanodeStorageInfo.java | 15 +-
.../hdfs/server/namenode/FSNamesystem.java | 18 +-
.../hdfs/server/namenode/NamenodeFsck.java | 2 +-
.../server/blockmanagement/TestBlockInfo.java | 2 +-
.../blockmanagement/TestBlockManager.java | 4 +-
.../server/blockmanagement/TestNodeCount.java | 2 +-
.../TestOverReplicatedBlocks.java | 4 +-
.../blockmanagement/TestReplicationPolicy.java | 2 +-
13 files changed, 361 insertions(+), 302 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 27e2e89..bb1b3ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -679,6 +679,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8651. Make hadoop-hdfs-project Native code -Wall-clean (Alan Burlison
via Colin P. McCabe)
+ HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery
+ blocks. (Zhe Zhang via jing9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index 4cc2791..5ad992b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -172,19 +172,23 @@ public abstract class BlockInfo extends Block
public abstract int numNodes();
/**
- * Add a {@link DatanodeStorageInfo} location for a block.
+ * Add a {@link DatanodeStorageInfo} location for a block
+ * @param storage The storage to add
+ * @param reportedBlock The block reported from the datanode. This is only
+ * used by erasure coded blocks, this block's id contains
+ * information indicating the index of the block in the
+ * corresponding block group.
*/
- abstract boolean addStorage(DatanodeStorageInfo storage);
+ abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
/**
* Remove {@link DatanodeStorageInfo} location for a block
*/
abstract boolean removeStorage(DatanodeStorageInfo storage);
-
/**
* Replace the current BlockInfo with the new one in corresponding
- * DatanodeStorageInfo's linked list
+ * DatanodeStorageInfo's linked list.
*/
abstract void replaceBlock(BlockInfo newBlock);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index b9abcd0..de64ad8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -45,7 +45,7 @@ public class BlockInfoContiguous extends BlockInfo {
}
@Override
- boolean addStorage(DatanodeStorageInfo storage) {
+ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
return ContiguousBlockStorageOp.addStorage(this, storage);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
index c66675a..d3cb337 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
@@ -69,7 +69,7 @@ public class BlockInfoUnderConstructionContiguous extends
}
@Override
- boolean addStorage(DatanodeStorageInfo storage) {
+ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
return ContiguousBlockStorageOp.addStorage(this, storage);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 368d3b0..5bd4980 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -198,8 +198,8 @@ public class BlockManager {
* Maps a StorageID to the set of blocks that are "extra" for this
* DataNode. We'll eventually remove these extras.
*/
- public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
- new TreeMap<String, LightWeightLinkedSet<Block>>();
+ public final Map<String, LightWeightLinkedSet<BlockInfo>> excessReplicateMap =
+ new TreeMap<>();
/**
* Store set of Blocks that need to be replicated 1 or more times.
@@ -502,8 +502,8 @@ public class BlockManager {
/** Dump meta data to out. */
public void metaSave(PrintWriter out) {
assert namesystem.hasWriteLock();
- final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
- final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+ final List<DatanodeDescriptor> live = new ArrayList<>();
+ final List<DatanodeDescriptor> dead = new ArrayList<>();
datanodeManager.fetchDatanodes(live, dead, false);
out.println("Live Datanodes: " + live.size());
out.println("Dead Datanodes: " + dead.size());
@@ -542,8 +542,8 @@ public class BlockManager {
List<DatanodeDescriptor> containingNodes =
new ArrayList<DatanodeDescriptor>();
List<DatanodeStorageInfo> containingLiveReplicasNodes =
- new ArrayList<DatanodeStorageInfo>();
-
+ new ArrayList<>();
+
NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used
chooseSourceDatanode(block, containingNodes,
@@ -572,7 +572,7 @@ public class BlockManager {
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(block);
- for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+ for (DatanodeStorageInfo storage : getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
String state = "";
if (corruptNodes != null && corruptNodes.contains(node)) {
@@ -595,11 +595,23 @@ public class BlockManager {
return maxReplicationStreams;
}
+ public int getDefaultStorageNum(BlockInfo block) {
+ return defaultReplication;
+ }
+
+ public short getMinStorageNum(BlockInfo block) {
+ return minReplication;
+ }
+
/**
- * @return true if the block has minimum replicas
+ * @return true if the block has minimum stored copies
*/
- public boolean checkMinReplication(BlockInfo block) {
- return (countNodes(block).liveReplicas() >= minReplication);
+ public boolean hasMinStorage(BlockInfo block) {
+ return hasMinStorage(block, countNodes(block).liveReplicas());
+ }
+
+ public boolean hasMinStorage(BlockInfo block, int liveNum) {
+ return liveNum >= getMinStorageNum(block);
}
/**
@@ -614,8 +626,9 @@ public class BlockManager {
private static boolean commitBlock(
final BlockInfoUnderConstruction block, final Block commitBlock)
throws IOException {
- if (block.getBlockUCState() == BlockUCState.COMMITTED)
+ if (block.getBlockUCState() == BlockUCState.COMMITTED) {
return false;
+ }
assert block.getNumBytes() <= commitBlock.getNumBytes() :
"commitBlock length is less than the stored one "
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
@@ -635,18 +648,22 @@ public class BlockManager {
*/
public boolean commitOrCompleteLastBlock(BlockCollection bc,
Block commitBlock) throws IOException {
- if(commitBlock == null)
+ if (commitBlock == null) {
return false; // not committing, this is a block allocation retry
+ }
BlockInfo lastBlock = bc.getLastBlock();
- if(lastBlock == null)
+ if (lastBlock == null) {
return false; // no blocks in file yet
- if(lastBlock.isComplete())
+ }
+ if (lastBlock.isComplete()) {
return false; // already completed (e.g. by syncBlock)
-
+ }
+
final boolean b = commitBlock(
(BlockInfoUnderConstruction) lastBlock, commitBlock);
- if(countNodes(lastBlock).liveReplicas() >= minReplication)
+ if(hasMinStorage(lastBlock)) {
completeBlock(bc, bc.numBlocks()-1, false);
+ }
return b;
}
@@ -659,20 +676,24 @@ public class BlockManager {
*/
private BlockInfo completeBlock(final BlockCollection bc,
final int blkIndex, boolean force) throws IOException {
- if(blkIndex < 0)
+ if (blkIndex < 0) {
return null;
+ }
BlockInfo curBlock = bc.getBlocks()[blkIndex];
- if(curBlock.isComplete())
+ if(curBlock.isComplete()) {
return curBlock;
+ }
BlockInfoUnderConstruction ucBlock =
(BlockInfoUnderConstruction) curBlock;
int numNodes = ucBlock.numNodes();
- if (!force && numNodes < minReplication)
+ if (!force && !hasMinStorage(curBlock, numNodes)) {
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
- if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
+ }
+ if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED) {
throw new IOException(
"Cannot complete block: block has not been COMMITTED by the client");
+ }
BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
// replace penultimate block in file
bc.setBlock(blkIndex, completeBlock);
@@ -757,7 +778,7 @@ public class BlockManager {
// count in safe-mode.
namesystem.adjustSafeModeBlockTotals(
// decrement safe if we had enough
- targets.length >= minReplication ? -1 : 0,
+ hasMinStorage(oldBlock, targets.length) ? -1 : 0,
// always decrement total blocks
-1);
@@ -771,8 +792,8 @@ public class BlockManager {
*/
private List<DatanodeStorageInfo> getValidLocations(Block block) {
final List<DatanodeStorageInfo> locations
- = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
- for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+ = new ArrayList<>(blocksMap.numNodes(block));
+ for(DatanodeStorageInfo storage : getStorages(block)) {
// filter invalidate replicas
if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
locations.add(storage);
@@ -785,7 +806,7 @@ public class BlockManager {
final BlockInfo[] blocks,
final long offset, final long length, final int nrBlocksToReturn,
final AccessMode mode) throws IOException {
- int curBlk = 0;
+ int curBlk;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@ -798,10 +819,10 @@ public class BlockManager {
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
- return Collections.<LocatedBlock>emptyList();
+ return Collections.emptyList();
long endOff = offset + length;
- List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
+ List<LocatedBlock> results = new ArrayList<>(blocks.length);
do {
results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
curPos += blocks[curBlk].getNumBytes();
@@ -814,7 +835,7 @@ public class BlockManager {
private LocatedBlock createLocatedBlock(final BlockInfo[] blocks,
final long endPos, final AccessMode mode) throws IOException {
- int curBlk = 0;
+ int curBlk;
long curPos = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@ -838,8 +859,8 @@ public class BlockManager {
}
/** @return a LocatedBlock for the given block */
- private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
- ) throws IOException {
+ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
+ throws IOException {
if (blk instanceof BlockInfoUnderConstruction) {
if (blk.isComplete()) {
throw new IOException(
@@ -849,7 +870,8 @@ public class BlockManager {
final BlockInfoUnderConstruction uc =
(BlockInfoUnderConstruction) blk;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
- final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
+ final ExtendedBlock eb =
+ new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return newLocatedBlock(eb, storages, pos, false);
}
@@ -869,11 +891,12 @@ public class BlockManager {
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
int j = 0;
if (numMachines > 0) {
- for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
+ for(DatanodeStorageInfo storage : getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
- if (isCorrupt || (!replicaCorrupt))
+ if (isCorrupt || (!replicaCorrupt)) {
machines[j++] = storage;
+ }
}
}
assert j == machines.length :
@@ -1047,7 +1070,7 @@ public class BlockManager {
for(int i=0; i<startBlock; i++) {
iter.next();
}
- List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+ List<BlockWithLocations> results = new ArrayList<>();
long totalSize = 0;
BlockInfo curBlock;
while(totalSize<size && iter.hasNext()) {
@@ -1071,7 +1094,7 @@ public class BlockManager {
/** Remove the blocks associated to the given datanode. */
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
- final Iterator<? extends Block> it = node.getBlockIterator();
+ final Iterator<BlockInfo> it = node.getBlockIterator();
while(it.hasNext()) {
removeStoredBlock(it.next(), node);
}
@@ -1085,10 +1108,10 @@ public class BlockManager {
/** Remove the blocks associated to the given DatanodeStorageInfo. */
void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
assert namesystem.hasWriteLock();
- final Iterator<? extends Block> it = storageInfo.getBlockIterator();
+ final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
while(it.hasNext()) {
- Block block = it.next();
+ BlockInfo block = it.next();
removeStoredBlock(block, node);
invalidateBlocks.remove(node, block);
}
@@ -1110,18 +1133,20 @@ public class BlockManager {
* Adds block to list of blocks which will be invalidated on all its
* datanodes.
*/
- private void addToInvalidates(Block b) {
+ private void addToInvalidates(BlockInfo storedBlock) {
if (!namesystem.isPopulatingReplQueues()) {
return;
}
StringBuilder datanodes = new StringBuilder();
- for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock,
+ State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
- invalidateBlocks.add(b, node, false);
+ invalidateBlocks.add(storedBlock, node, false);
datanodes.append(node).append(" ");
}
if (datanodes.length() != 0) {
- blockLog.info("BLOCK* addToInvalidates: {} {}", b, datanodes.toString());
+ blockLog.info("BLOCK* addToInvalidates: {} {}", storedBlock,
+ datanodes.toString());
}
}
@@ -1148,7 +1173,8 @@ public class BlockManager {
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn, String storageID, String reason) throws IOException {
assert namesystem.hasWriteLock();
- final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
+ final Block reportedBlock = blk.getLocalBlock();
+ final BlockInfo storedBlock = getStoredBlock(reportedBlock);
if (storedBlock == null) {
// Check if the replica is in the blockMap, if not
// ignore the request for now. This could happen when BlockScanner
@@ -1164,8 +1190,8 @@ public class BlockManager {
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
+ ") does not exist");
}
-
- markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+
+ markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
storageID == null ? null : node.getStorageInfo(storageID),
node);
@@ -1181,18 +1207,18 @@ public class BlockManager {
DatanodeStorageInfo storageInfo,
DatanodeDescriptor node) throws IOException {
- if (b.corrupted.isDeleted()) {
+ if (b.stored.isDeleted()) {
blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
" corrupt as it does not belong to any file", b);
addToInvalidates(b.corrupted, node);
return;
}
short expectedReplicas =
- b.corrupted.getBlockCollection().getPreferredBlockReplication();
+ getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored);
// Add replica to the data-node if it is not already there
if (storageInfo != null) {
- storageInfo.addBlock(b.stored);
+ storageInfo.addBlock(b.stored, b.corrupted);
}
// Add this replica to corruptReplicas Map
@@ -1202,8 +1228,8 @@ public class BlockManager {
NumberReplicas numberOfReplicas = countNodes(b.stored);
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
expectedReplicas;
- boolean minReplicationSatisfied =
- numberOfReplicas.liveReplicas() >= minReplication;
+ boolean minReplicationSatisfied = hasMinStorage(b.stored,
+ numberOfReplicas.liveReplicas());
boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
expectedReplicas;
@@ -1346,7 +1372,7 @@ public class BlockManager {
int additionalReplRequired;
int scheduledWork = 0;
- List<ReplicationWork> work = new LinkedList<ReplicationWork>();
+ List<ReplicationWork> work = new LinkedList<>();
namesystem.writeLock();
try {
@@ -1363,11 +1389,11 @@ public class BlockManager {
continue;
}
- requiredReplication = bc.getPreferredBlockReplication();
+ requiredReplication = getExpectedReplicaNum(bc, block);
// get a source data-node
- containingNodes = new ArrayList<DatanodeDescriptor>();
- List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
+ containingNodes = new ArrayList<>();
+ List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas();
srcNode = chooseSourceDatanode(
block, containingNodes, liveReplicaNodes, numReplicas,
@@ -1387,7 +1413,7 @@ public class BlockManager {
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block)) ) {
+ (blockHasEnoughRacks(block, requiredReplication)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
blockLog.info("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
@@ -1411,7 +1437,7 @@ public class BlockManager {
namesystem.writeUnlock();
}
- final Set<Node> excludedNodes = new HashSet<Node>();
+ final Set<Node> excludedNodes = new HashSet<>();
for(ReplicationWork rw : work){
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
@@ -1447,7 +1473,7 @@ public class BlockManager {
rw.targets = null;
continue;
}
- requiredReplication = bc.getPreferredBlockReplication();
+ requiredReplication = getExpectedReplicaNum(bc, block);
// do not schedule more if enough replicas is already pending
NumberReplicas numReplicas = countNodes(block);
@@ -1456,7 +1482,7 @@ public class BlockManager {
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block)) ) {
+ (blockHasEnoughRacks(block, requiredReplication)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
blockLog.info("BLOCK* Removing {} from neededReplications as" +
@@ -1466,7 +1492,7 @@ public class BlockManager {
}
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
- (!blockHasEnoughRacks(block)) ) {
+ (!blockHasEnoughRacks(block, requiredReplication)) ) {
if (rw.srcNode.getNetworkLocation().equals(
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
//No use continuing, unless a new rack in this case
@@ -1581,7 +1607,7 @@ public class BlockManager {
List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
List<DatanodeDescriptor> datanodeDescriptors = null;
if (nodes != null) {
- datanodeDescriptors = new ArrayList<DatanodeDescriptor>(nodes.size());
+ datanodeDescriptors = new ArrayList<>(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
if (node != null) {
@@ -1637,9 +1663,9 @@ public class BlockManager {
int excess = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
- for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+ for(DatanodeStorageInfo storage : getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
- LightWeightLinkedSet<Block> excessBlocks =
+ LightWeightLinkedSet<BlockInfo> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid());
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
@@ -1707,7 +1733,7 @@ public class BlockManager {
* Use the blockinfo from the blocksmap to be certain we're working
* with the most up-to-date block information (e.g. genstamp).
*/
- BlockInfo bi = blocksMap.getStoredBlock(timedOutItems[i]);
+ BlockInfo bi = getStoredBlock(timedOutItems[i]);
if (bi == null) {
continue;
}
@@ -1757,7 +1783,7 @@ public class BlockManager {
final BlockInfoUnderConstruction storedBlock;
final Block reportedBlock;
final ReplicaState reportedState;
-
+
StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
Block reportedBlock, ReplicaState reportedState) {
this.storedBlock = storedBlock;
@@ -1765,14 +1791,34 @@ public class BlockManager {
this.reportedState = reportedState;
}
}
-
+
+ private static class BlockInfoToAdd {
+ private final BlockInfo stored;
+ private final Block reported;
+
+ BlockInfoToAdd(BlockInfo stored, Block reported) {
+ this.stored = stored;
+ this.reported = reported;
+ }
+
+ public BlockInfo getStored() {
+ return stored;
+ }
+
+ public Block getReported() {
+ return reported;
+ }
+ }
+
/**
* BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
* list of blocks that should be considered corrupt due to a block report.
*/
private static class BlockToMarkCorrupt {
- /** The corrupted block in a datanode. */
- final BlockInfo corrupted;
+ /** The corrupted block in a datanode. This is the one reported by the
+ * datanode.
+ */
+ final Block corrupted;
/** The corresponding block stored in the BlockManager. */
final BlockInfo stored;
/** The reason to mark corrupt. */
@@ -1780,7 +1826,7 @@ public class BlockManager {
/** The reason code to be stored */
final Reason reasonCode;
- BlockToMarkCorrupt(BlockInfo corrupted,
+ BlockToMarkCorrupt(Block corrupted,
BlockInfo stored, String reason,
Reason reasonCode) {
Preconditions.checkNotNull(corrupted, "corrupted is null");
@@ -1792,15 +1838,9 @@ public class BlockManager {
this.reasonCode = reasonCode;
}
- BlockToMarkCorrupt(BlockInfo stored, String reason,
- Reason reasonCode) {
- this(stored, stored, reason, reasonCode);
- }
-
- BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
- Reason reasonCode) {
- this(new BlockInfoContiguous(stored), stored,
- reason, reasonCode);
+ BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs,
+ String reason, Reason reasonCode) {
+ this(corrupted, stored, reason, reasonCode);
//the corrupted block in datanode has a different generation stamp
corrupted.setGenerationStamp(gs);
}
@@ -1987,7 +2027,7 @@ public class BlockManager {
break;
}
- BlockInfo bi = blocksMap.getStoredBlock(b);
+ BlockInfo bi = getStoredBlock(b);
if (bi == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
@@ -2019,7 +2059,7 @@ public class BlockManager {
endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
}
}
-
+
private Collection<Block> processReport(
final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report) throws IOException {
@@ -2027,25 +2067,26 @@ public class BlockManager {
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
//
- Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
- Collection<Block> toRemove = new TreeSet<Block>();
- Collection<Block> toInvalidate = new LinkedList<Block>();
- Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
- Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+ Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
+ Collection<BlockInfo> toRemove = new TreeSet<>();
+ Collection<Block> toInvalidate = new LinkedList<>();
+ Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
+ Collection<StatefulBlockInfo> toUC = new LinkedList<>();
reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue
- for (StatefulBlockInfo b : toUC) {
+ for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b, storageInfo);
}
- for (Block b : toRemove) {
+ for (BlockInfo b : toRemove) {
removeStoredBlock(b, node);
}
int numBlocksLogged = 0;
- for (BlockInfo b : toAdd) {
- addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
+ for (BlockInfoToAdd b : toAdd) {
+ addStoredBlock(b.getStored(), b.getReported(), storageInfo, null,
+ numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2066,17 +2107,17 @@ public class BlockManager {
* Mark block replicas as corrupt except those on the storages in
* newStorages list.
*/
- public void markBlockReplicasAsCorrupt(BlockInfo block,
- long oldGenerationStamp, long oldNumBytes,
+ public void markBlockReplicasAsCorrupt(Block oldBlock, BlockInfo block,
+ long oldGenerationStamp, long oldNumBytes,
DatanodeStorageInfo[] newStorages) throws IOException {
assert namesystem.hasWriteLock();
BlockToMarkCorrupt b = null;
if (block.getGenerationStamp() != oldGenerationStamp) {
- b = new BlockToMarkCorrupt(block, oldGenerationStamp,
+ b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
"genstamp does not match " + oldGenerationStamp
+ " : " + block.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else if (block.getNumBytes() != oldNumBytes) {
- b = new BlockToMarkCorrupt(block,
+ b = new BlockToMarkCorrupt(oldBlock, block,
"length does not match " + oldNumBytes
+ " : " + block.getNumBytes(), Reason.SIZE_MISMATCH);
} else {
@@ -2135,7 +2176,7 @@ public class BlockManager {
continue;
}
- BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
+ BlockInfo storedBlock = getStoredBlock(iblk);
// If block does not belong to any file, we are done.
if (storedBlock == null) continue;
@@ -2173,24 +2214,26 @@ public class BlockManager {
}
//add replica if appropriate
if (reportedState == ReplicaState.FINALIZED) {
- addStoredBlockImmediate(storedBlock, storageInfo);
+ addStoredBlockImmediate(storedBlock, iblk, storageInfo);
}
}
}
- private void reportDiff(DatanodeStorageInfo storageInfo,
- BlockListAsLongs newReport,
- Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
- Collection<Block> toRemove, // remove from DatanodeDescriptor
+ private void reportDiff(DatanodeStorageInfo storageInfo,
+ BlockListAsLongs newReport,
+ Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
+ Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
- // place a delimiter in the list which separates blocks
+ // place a delimiter in the list which separates blocks
// that have been reported from those that have not
- BlockInfo delimiter = new BlockInfoContiguous(new Block(), (short) 1);
- AddBlockResult result = storageInfo.addBlock(delimiter);
- assert result == AddBlockResult.ADDED
+ Block delimiterBlock = new Block();
+ BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
+ (short) 1);
+ AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
+ assert result == AddBlockResult.ADDED
: "Delimiting block cannot be present in the node";
int headIndex = 0; //currently the delimiter is in the head of the list
int curIndex;
@@ -2207,7 +2250,8 @@ public class BlockManager {
// move block to the head of the list
if (storedBlock != null &&
(curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
- headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
+ headIndex =
+ storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
}
}
@@ -2215,8 +2259,9 @@ public class BlockManager {
// all of them are next to the delimiter
Iterator<BlockInfo> it =
storageInfo.new BlockIterator(delimiter.getNext(0));
- while(it.hasNext())
+ while (it.hasNext()) {
toRemove.add(it.next());
+ }
storageInfo.removeBlock(delimiter);
}
@@ -2253,12 +2298,12 @@ public class BlockManager {
*/
private BlockInfo processReportedBlock(
final DatanodeStorageInfo storageInfo,
- final Block block, final ReplicaState reportedState,
- final Collection<BlockInfo> toAdd,
- final Collection<Block> toInvalidate,
+ final Block block, final ReplicaState reportedState,
+ final Collection<BlockInfoToAdd> toAdd,
+ final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
-
+
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
if(LOG.isDebugEnabled()) {
@@ -2266,16 +2311,16 @@ public class BlockManager {
+ " on " + dn + " size " + block.getNumBytes()
+ " replicaState = " + reportedState);
}
-
+
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(block)) {
queueReportedBlock(storageInfo, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
return null;
}
-
+
// find block by blockId
- BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+ BlockInfo storedBlock = getStoredBlock(block);
if(storedBlock == null) {
// If blocksMap does not contain reported block id,
// the replica should be removed from the data-node.
@@ -2283,7 +2328,7 @@ public class BlockManager {
return null;
}
BlockUCState ucState = storedBlock.getBlockUCState();
-
+
// Block is on the NN
if(LOG.isDebugEnabled()) {
LOG.debug("In memory blockUCState = " + ucState);
@@ -2328,8 +2373,8 @@ public class BlockManager {
// but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
&& (storedBlock.findStorageInfo(storageInfo) == -1 ||
- corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
- toAdd.add(storedBlock);
+ corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+ toAdd.add(new BlockInfoToAdd(storedBlock, block));
}
return storedBlock;
}
@@ -2375,7 +2420,7 @@ public class BlockManager {
if (rbi.getReportedState() == null) {
// This is a DELETE_BLOCK request
DatanodeStorageInfo storageInfo = rbi.getStorageInfo();
- removeStoredBlock(rbi.getBlock(),
+ removeStoredBlock(getStoredBlock(rbi.getBlock()),
storageInfo.getDatanodeDescriptor());
} else {
processAndHandleReportedBlock(rbi.getStorageInfo(),
@@ -2423,15 +2468,15 @@ public class BlockManager {
case COMMITTED:
if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
- return new BlockToMarkCorrupt(storedBlock, reportedGS,
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
"block is " + ucState + " and reported genstamp " + reportedGS
- + " does not match genstamp in block map "
- + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+ + " does not match genstamp in block map "
+ + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
- return new BlockToMarkCorrupt(storedBlock,
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock,
"block is " + ucState + " and reported length " +
- reported.getNumBytes() + " does not match " +
- "length in block map " + storedBlock.getNumBytes(),
+ reported.getNumBytes() + " does not match " +
+ "length in block map " + storedBlock.getNumBytes(),
Reason.SIZE_MISMATCH);
} else {
return null; // not corrupt
@@ -2439,11 +2484,12 @@ public class BlockManager {
case UNDER_CONSTRUCTION:
if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
- return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
- + ucState + " and reported state " + reportedState
- + ", But reported genstamp " + reportedGS
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock,
+ reportedGS, "block is " + ucState + " and reported state "
+ + reportedState + ", But reported genstamp " + reportedGS
+ " does not match genstamp in block map "
- + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+ + storedBlock.getGenerationStamp(),
+ Reason.GENSTAMP_MISMATCH);
}
return null;
default:
@@ -2453,12 +2499,15 @@ public class BlockManager {
case RWR:
if (!storedBlock.isComplete()) {
return null; // not corrupt
- } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
+ } else if (storedBlock.getGenerationStamp() !=
+ reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
- return new BlockToMarkCorrupt(storedBlock, reportedGS,
- "reported " + reportedState + " replica with genstamp " + reportedGS
- + " does not match COMPLETE block's genstamp in block map "
- + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+ return new BlockToMarkCorrupt(
+ new Block(reported), storedBlock, reportedGS,
+ "reported " + reportedState +
+ " replica with genstamp " + reportedGS +
+ " does not match COMPLETE block's genstamp in block map " +
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else { // COMPLETE block, same genstamp
if (reportedState == ReplicaState.RBW) {
// If it's a RBW report for a COMPLETE block, it may just be that
@@ -2470,7 +2519,7 @@ public class BlockManager {
"complete with the same genstamp");
return null;
} else {
- return new BlockToMarkCorrupt(storedBlock,
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock,
"reported replica has invalid state " + reportedState,
Reason.INVALID_STATE);
}
@@ -2483,7 +2532,8 @@ public class BlockManager {
" on " + dn + " size " + storedBlock.getNumBytes();
// log here at WARN level since this is really a broken HDFS invariant
LOG.warn(msg);
- return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
+ return new BlockToMarkCorrupt(new Block(reported), storedBlock, msg,
+ Reason.INVALID_STATE);
}
}
@@ -2516,7 +2566,7 @@ public class BlockManager {
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
(block.findStorageInfo(storageInfo) < 0)) {
- addStoredBlock(block, storageInfo, null, true);
+ addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
}
}
@@ -2531,23 +2581,23 @@ public class BlockManager {
*
* @throws IOException
*/
- private void addStoredBlockImmediate(BlockInfo storedBlock,
+ private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
DatanodeStorageInfo storageInfo)
- throws IOException {
+ throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
- if (!namesystem.isInStartupSafeMode()
+ if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) {
- addStoredBlock(storedBlock, storageInfo, null, false);
+ addStoredBlock(storedBlock, reported, storageInfo, null, false);
return;
}
// just add it
- AddBlockResult result = storageInfo.addBlock(storedBlock);
+ AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
- && numCurrentReplica >= minReplication) {
+ && hasMinStorage(storedBlock, numCurrentReplica)) {
completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
@@ -2561,19 +2611,20 @@ public class BlockManager {
/**
* Modify (block-->datanode) map. Remove block from set of
* needed replications if this takes care of the problem.
- * @return the block that is stored in blockMap.
+ * @return the block that is stored in blocksMap.
*/
private Block addStoredBlock(final BlockInfo block,
- DatanodeStorageInfo storageInfo,
- DatanodeDescriptor delNodeHint,
- boolean logEveryBlock)
- throws IOException {
+ final Block reportedBlock,
+ DatanodeStorageInfo storageInfo,
+ DatanodeDescriptor delNodeHint,
+ boolean logEveryBlock)
+ throws IOException {
assert block != null && namesystem.hasWriteLock();
BlockInfo storedBlock;
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
if (block instanceof BlockInfoUnderConstruction) {
//refresh our copy in case the block got completed in another thread
- storedBlock = blocksMap.getStoredBlock(block);
+ storedBlock = getStoredBlock(block);
} else {
storedBlock = block;
}
@@ -2587,10 +2638,9 @@ public class BlockManager {
return block;
}
BlockCollection bc = storedBlock.getBlockCollection();
- assert bc != null : "Block must belong to a file";
// add block to the datanode
- AddBlockResult result = storageInfo.addBlock(storedBlock);
+ AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
int curReplicaDelta;
if (result == AddBlockResult.ADDED) {
@@ -2618,10 +2668,10 @@ public class BlockManager {
NumberReplicas num = countNodes(storedBlock);
int numLiveReplicas = num.liveReplicas();
int numCurrentReplica = numLiveReplicas
- + pendingReplications.getNumReplicas(storedBlock);
+ + pendingReplications.getNumReplicas(storedBlock);
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
- numLiveReplicas >= minReplication) {
+ hasMinStorage(storedBlock, numLiveReplicas)) {
storedBlock = completeBlock(bc, storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
@@ -2631,7 +2681,7 @@ public class BlockManager {
// handles the safe block count maintenance.
namesystem.incrementSafeBlockCount(numCurrentReplica);
}
-
+
// if file is under construction, then done for now
if (bc.isUnderConstruction()) {
return storedBlock;
@@ -2643,7 +2693,7 @@ public class BlockManager {
}
// handle underReplication/overReplication
- short fileReplication = bc.getPreferredBlockReplication();
+ short fileReplication = getExpectedReplicaNum(bc, storedBlock);
if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica,
num.decommissionedAndDecommissioning(), fileReplication);
@@ -2659,11 +2709,12 @@ public class BlockManager {
int numCorruptNodes = num.corruptReplicas();
if (numCorruptNodes != corruptReplicasCount) {
LOG.warn("Inconsistent number of corrupt replicas for " +
- storedBlock + "blockMap has " + numCorruptNodes +
+ storedBlock + ". blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + corruptReplicasCount);
}
- if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
- invalidateCorruptReplicas(storedBlock);
+ if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
+ invalidateCorruptReplicas(storedBlock, reportedBlock);
+ }
return storedBlock;
}
@@ -2695,7 +2746,7 @@ public class BlockManager {
*
* @param blk Block whose corrupt replicas need to be invalidated
*/
- private void invalidateCorruptReplicas(BlockInfo blk) {
+ private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
boolean removedFromBlocksMap = true;
if (nodes == null)
@@ -2705,8 +2756,8 @@ public class BlockManager {
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
for (DatanodeDescriptor node : nodesCopy) {
try {
- if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
- Reason.ANY), node)) {
+ if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
+ Reason.ANY), node)) {
removedFromBlocksMap = false;
}
} catch (IOException e) {
@@ -2874,7 +2925,7 @@ public class BlockManager {
}
// calculate current replication
short expectedReplication =
- block.getBlockCollection().getPreferredBlockReplication();
+ getExpectedReplicaNum(block.getBlockCollection(), block);
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
@@ -2933,14 +2984,14 @@ public class BlockManager {
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
*/
- private void processOverReplicatedBlock(final Block block,
+ private void processOverReplicatedBlock(final BlockInfo block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
assert namesystem.hasWriteLock();
if (addedNode == delNodeHint) {
delNodeHint = null;
}
- Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
+ Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
@@ -2954,8 +3005,8 @@ public class BlockManager {
postponeBlock(block);
return;
}
- LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
- .getDatanodeUuid());
+ LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+ cur.getDatanodeUuid());
if (excessBlocks == null || !excessBlocks.contains(block)) {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
// exclude corrupt replicas
@@ -2965,7 +3016,7 @@ public class BlockManager {
}
}
}
- chooseExcessReplicates(nonExcess, block, replication,
+ chooseExcessReplicates(nonExcess, block, replication,
addedNode, delNodeHint, blockplacement);
}
@@ -2984,29 +3035,29 @@ public class BlockManager {
* If no such a node is available,
* then pick a node with least free space
*/
- private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,
- Block b, short replication,
- DatanodeDescriptor addedNode,
- DatanodeDescriptor delNodeHint,
- BlockPlacementPolicy replicator) {
+ private void chooseExcessReplicates(
+ final Collection<DatanodeStorageInfo> nonExcess,
+ BlockInfo storedBlock, short replication,
+ DatanodeDescriptor addedNode,
+ DatanodeDescriptor delNodeHint,
+ BlockPlacementPolicy replicator) {
assert namesystem.hasWriteLock();
// first form a rack to datanodes map and
- BlockCollection bc = getBlockCollection(b);
- final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
+ BlockCollection bc = getBlockCollection(storedBlock);
+ final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
+ bc.getStoragePolicyID());
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
+ final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
+ final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
+ final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
- final Map<String, List<DatanodeStorageInfo>> rackMap
- = new HashMap<String, List<DatanodeStorageInfo>>();
- final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
- final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
-
// split nodes into two sets
// moreThanOne contains nodes on rack with more than one replica
// exactlyOne contains the remaining nodes
replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
-
+
// pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains
@@ -3021,7 +3072,7 @@ public class BlockManager {
moreThanOne, excessTypes)) {
cur = delNodeHintStorage;
} else { // regular excessive replica removal
- cur = replicator.chooseReplicaToDelete(bc, b, replication,
+ cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication,
moreThanOne, exactlyOne, excessTypes);
}
firstOne = false;
@@ -3030,24 +3081,29 @@ public class BlockManager {
replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
exactlyOne, cur);
- nonExcess.remove(cur);
- addToExcessReplicate(cur.getDatanodeDescriptor(), b);
-
- //
- // The 'excessblocks' tracks blocks until we get confirmation
- // that the datanode has deleted them; the only way we remove them
- // is when we get a "removeBlock" message.
- //
- // The 'invalidate' list is used to inform the datanode the block
- // should be deleted. Items are removed from the invalidate list
- // upon giving instructions to the namenode.
- //
- addToInvalidates(b, cur.getDatanodeDescriptor());
- blockLog.info("BLOCK* chooseExcessReplicates: "
- +"({}, {}) is added to invalidated blocks set", cur, b);
+ processChosenExcessReplica(nonExcess, cur, storedBlock);
}
}
+ private void processChosenExcessReplica(
+ final Collection<DatanodeStorageInfo> nonExcess,
+ final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
+ nonExcess.remove(chosen);
+ addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock);
+ //
+ // The 'excessblocks' tracks blocks until we get confirmation
+ // that the datanode has deleted them; the only way we remove them
+ // is when we get a "removeBlock" message.
+ //
+ // The 'invalidate' list is used to inform the datanode the block
+ // should be deleted. Items are removed from the invalidate list
+ // upon giving instructions to the datanodes.
+ //
+ addToInvalidates(storedBlock, chosen.getDatanodeDescriptor());
+ blockLog.info("BLOCK* chooseExcessReplicates: "
+ +"({}, {}) is added to invalidated blocks set", chosen, storedBlock);
+ }
+
/** Check if we can use delHint */
static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
@@ -3069,17 +3125,18 @@ public class BlockManager {
}
}
- private void addToExcessReplicate(DatanodeInfo dn, Block block) {
+ private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
assert namesystem.hasWriteLock();
- LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
+ LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+ dn.getDatanodeUuid());
if (excessBlocks == null) {
- excessBlocks = new LightWeightLinkedSet<Block>();
+ excessBlocks = new LightWeightLinkedSet<>();
excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
}
- if (excessBlocks.add(block)) {
+ if (excessBlocks.add(storedBlock)) {
excessBlocksCount.incrementAndGet();
blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to"
- + " excessReplicateMap", dn, block);
+ + " excessReplicateMap", dn, storedBlock);
}
}
@@ -3091,26 +3148,26 @@ public class BlockManager {
QUEUE_REASON_FUTURE_GENSTAMP);
return;
}
- removeStoredBlock(block, node);
+ removeStoredBlock(getStoredBlock(block), node);
}
/**
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
* removed block is still valid.
*/
- public void removeStoredBlock(Block block, DatanodeDescriptor node) {
- blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
+ public void removeStoredBlock(BlockInfo storedBlock,
+ DatanodeDescriptor node) {
+ blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
assert (namesystem.hasWriteLock());
{
- BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
- " removed from node {}", block, node);
+ " removed from node {}", storedBlock, node);
return;
}
CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
- .get(new CachedBlock(block.getBlockId(), (short) 0, false));
+ .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false));
if (cblock != null) {
boolean removed = false;
removed |= node.getPendingCached().remove(cblock);
@@ -3118,7 +3175,7 @@ public class BlockManager {
removed |= node.getPendingUncached().remove(cblock);
if (removed) {
blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching "
- + "related lists on node {}", block, node);
+ + "related lists on node {}", storedBlock, node);
}
}
@@ -3128,7 +3185,7 @@ public class BlockManager {
// necessary. In that case, put block on a possibly-will-
// be-replicated list.
//
- BlockCollection bc = blocksMap.getBlockCollection(block);
+ BlockCollection bc = storedBlock.getBlockCollection();
if (bc != null) {
namesystem.decrementSafeBlockCount(storedBlock);
updateNeededReplications(storedBlock, -1, 0);
@@ -3138,13 +3195,13 @@ public class BlockManager {
// We've removed a block from a node, so it's definitely no longer
// in "excess" there.
//
- LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
- .getDatanodeUuid());
+ LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
+ node.getDatanodeUuid());
if (excessBlocks != null) {
- if (excessBlocks.remove(block)) {
+ if (excessBlocks.remove(storedBlock)) {
excessBlocksCount.decrementAndGet();
blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " +
- "excessBlocks", block);
+ "excessBlocks", storedBlock);
if (excessBlocks.size() == 0) {
excessReplicateMap.remove(node.getDatanodeUuid());
}
@@ -3152,7 +3209,7 @@ public class BlockManager {
}
// Remove the replica from corruptReplicas
- corruptReplicas.removeFromCorruptReplicasMap(block, node);
+ corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node);
}
}
@@ -3160,7 +3217,7 @@ public class BlockManager {
* Get all valid locations of the block & add the block to results
* return the length of the added block; 0 if the block is not added
*/
- private long addBlock(Block block, List<BlockWithLocations> results) {
+ private long addBlock(BlockInfo block, List<BlockWithLocations> results) {
final List<DatanodeStorageInfo> locations = getValidLocations(block);
if(locations.size() == 0) {
return 0;
@@ -3212,31 +3269,32 @@ public class BlockManager {
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
}
-
+
private void processAndHandleReportedBlock(
DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
// blockReceived reports a finalized block
- Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
- Collection<Block> toInvalidate = new LinkedList<Block>();
- Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
- Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+ Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
+ Collection<Block> toInvalidate = new LinkedList<>();
+ Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
+ Collection<StatefulBlockInfo> toUC = new LinkedList<>();
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
- processReportedBlock(storageInfo, block, reportedState,
- toAdd, toInvalidate, toCorrupt, toUC);
+ processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
+ toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
- : "The block should be only in one of the lists.";
+ : "The block should be only in one of the lists.";
- for (StatefulBlockInfo b : toUC) {
+ for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b, storageInfo);
}
long numBlocksLogged = 0;
- for (BlockInfo b : toAdd) {
- addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+ for (BlockInfoToAdd b : toAdd) {
+ addStoredBlock(b.getStored(), b.getReported(), storageInfo, delHintNode,
+ numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@@ -3301,7 +3359,7 @@ public class BlockManager {
ReplicaState.RBW, null);
break;
default:
- String msg =
+ String msg =
"Unknown block status code reported by " + nodeID +
": " + rdbi;
blockLog.warn(msg);
@@ -3337,8 +3395,8 @@ public class BlockManager {
} else if (node.isDecommissioned()) {
decommissioned++;
} else {
- LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
- .getDatanodeUuid());
+ LightWeightLinkedSet<BlockInfo> blocksExcess =
+ excessReplicateMap.get(node.getDatanodeUuid());
if (blocksExcess != null && blocksExcess.contains(b)) {
excess++;
} else {
@@ -3391,13 +3449,13 @@ public class BlockManager {
int numOverReplicated = 0;
while(it.hasNext()) {
final BlockInfo block = it.next();
- BlockCollection bc = blocksMap.getBlockCollection(block);
- short expectedReplication = bc.getPreferredBlockReplication();
+ int expectedReplication = this.getReplication(block);
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
if (numCurrentReplica > expectedReplication) {
// over-replicated block
- processOverReplicatedBlock(block, expectedReplication, null, null);
+ processOverReplicatedBlock(block, (short) expectedReplication, null,
+ null);
numOverReplicated++;
}
}
@@ -3423,7 +3481,7 @@ public class BlockManager {
if (pendingReplicationBlocksCount == 0 &&
underReplicatedBlocksCount == 0) {
LOG.info("Node {} is dead and there are no under-replicated" +
- " blocks or blocks pending replication. Safe to decommission.",
+ " blocks or blocks pending replication. Safe to decommission.",
node);
return true;
}
@@ -3441,6 +3499,12 @@ public class BlockManager {
return blocksMap.size();
}
+
+ /** @return an iterator of the datanodes. */
+ public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
+ return blocksMap.getStorages(block);
+ }
+
public DatanodeStorageInfo[] getStorages(BlockInfo block) {
final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()];
int i = 0;
@@ -3529,10 +3593,12 @@ public class BlockManager {
final BlockInfoUnderConstruction uc =
(BlockInfoUnderConstruction)b;
final int numNodes = b.numNodes();
- LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
- + uc.getBlockUCState() + ", replication# = " + numNodes
- + (numNodes < minReplication ? " < ": " >= ")
- + " minimum = " + minReplication + ") in file " + src);
+ final int min = getMinStorageNum(b);
+ final BlockUCState state = b.getBlockUCState();
+ LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " + state
+ + ", replication# = " + numNodes
+ + (numNodes < min ? " < " : " >= ")
+ + " minimum = " + min + ") in file " + src);
return false;
}
}
@@ -3543,15 +3609,15 @@ public class BlockManager {
* @return 0 if the block is not found;
* otherwise, return the replication factor of the block.
*/
- private int getReplication(Block block) {
+ private int getReplication(BlockInfo block) {
final BlockCollection bc = blocksMap.getBlockCollection(block);
- return bc == null? 0: bc.getPreferredBlockReplication();
+ return bc == null? 0: getExpectedReplicaNum(bc, block);
}
/**
- * Get blocks to invalidate for <i>nodeId</i>
- * in {@link #invalidateBlocks}.
+ * Get blocks to invalidate for <i>nodeId</i>.
+ * in {@link #invalidateBlocks}.boolean blockHasEnoughRacks
*
* @return number of blocks scheduled for removal during this iteration.
*/
@@ -3589,22 +3655,20 @@ public class BlockManager {
return toInvalidate.size();
}
- boolean blockHasEnoughRacks(Block b) {
+ boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) {
if (!this.shouldCheckForEnoughRacks) {
return true;
}
- boolean enoughRacks = false;;
- Collection<DatanodeDescriptor> corruptNodes =
- corruptReplicas.getNodes(b);
- int numExpectedReplicas = getReplication(b);
+ boolean enoughRacks = false;
+ Collection<DatanodeDescriptor> corruptNodes =
+ corruptReplicas.getNodes(storedBlock);
String rackName = null;
- for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
+ for(DatanodeStorageInfo storage : getStorages(storedBlock)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
- if (numExpectedReplicas == 1 ||
- (numExpectedReplicas > 1 &&
- !datanodeManager.hasClusterEverBeenMultiRack())) {
+ if (expectedStorageNum == 1 || (expectedStorageNum > 1 &&
+ !datanodeManager.hasClusterEverBeenMultiRack())) {
enoughRacks = true;
break;
}
@@ -3625,8 +3689,13 @@ public class BlockManager {
* A block needs replication if the number of replicas is less than expected
* or if it does not have enough racks.
*/
- boolean isNeededReplication(Block b, int expected, int current) {
- return current < expected || !blockHasEnoughRacks(b);
+ boolean isNeededReplication(BlockInfo storedBlock, int expected,
+ int current) {
+ return current < expected || !blockHasEnoughRacks(storedBlock, expected);
+ }
+
+ public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) {
+ return bc.getPreferredBlockReplication();
}
public long getMissingBlocksCount() {
@@ -3648,11 +3717,6 @@ public class BlockManager {
return blocksMap.getBlockCollection(b);
}
- /** @return an iterator of the datanodes. */
- public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
- return blocksMap.getStorages(block);
- }
-
public int numCorruptReplicas(Block block) {
return corruptReplicas.numCorruptReplicas(block);
}
@@ -3668,9 +3732,10 @@ public class BlockManager {
* If a block is removed from blocksMap, remove it from excessReplicateMap.
*/
private void removeFromExcessReplicateMap(Block block) {
- for (DatanodeStorageInfo info : blocksMap.getStorages(block)) {
+ for (DatanodeStorageInfo info : getStorages(block)) {
String uuid = info.getDatanodeDescriptor().getDatanodeUuid();
- LightWeightLinkedSet<Block> excessReplicas = excessReplicateMap.get(uuid);
+ LightWeightLinkedSet<BlockInfo> excessReplicas =
+ excessReplicateMap.get(uuid);
if (excessReplicas != null) {
if (excessReplicas.remove(block)) {
excessBlocksCount.decrementAndGet();
@@ -3685,26 +3750,6 @@ public class BlockManager {
public int getCapacity() {
return blocksMap.getCapacity();
}
-
- /**
- * Return a range of corrupt replica block ids. Up to numExpectedBlocks
- * blocks starting at the next block after startingBlockId are returned
- * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
- * is null, up to numExpectedBlocks blocks are returned from the beginning.
- * If startingBlockId cannot be found, null is returned.
- *
- * @param numExpectedBlocks Number of block ids to return.
- * 0 <= numExpectedBlocks <= 100
- * @param startingBlockId Block id from which to start. If null, start at
- * beginning.
- * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
- *
- */
- public long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
- Long startingBlockId) {
- return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks,
- startingBlockId);
- }
/**
* Return an iterator over the set of blocks for which there are no replicas.
@@ -3879,7 +3924,7 @@ public class BlockManager {
/**
* A simple result enum for the result of
- * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
+ * {@link BlockManager#processMisReplicatedBlock}.
*/
enum MisReplicationResult {
/** The block should be invalidated since it belongs to a deleted file. */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 57e81b4..65b83e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -24,6 +24,7 @@ import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
@@ -226,7 +227,7 @@ public class DatanodeStorageInfo {
return blockPoolUsed;
}
- public AddBlockResult addBlock(BlockInfo b) {
+ public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
// First check whether the block belongs to a different storage
// on the same DN.
AddBlockResult result = AddBlockResult.ADDED;
@@ -245,10 +246,18 @@ public class DatanodeStorageInfo {
}
// add to the head of the data-node list
- b.addStorage(this);
+ b.addStorage(this, reportedBlock);
+ insertToList(b);
+ return result;
+ }
+
+ AddBlockResult addBlock(BlockInfo b) {
+ return addBlock(b, b);
+ }
+
+ public void insertToList(BlockInfo b) {
blockList = b.listInsert(blockList, this);
numBlocks++;
- return result;
}
public boolean removeBlock(BlockInfo b) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b073a89..d0f4e08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -143,7 +143,6 @@ import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
@@ -3136,7 +3135,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (trackBlockCounts) {
if (b.isComplete()) {
numRemovedComplete++;
- if (blockManager.checkMinReplication(b)) {
+ if (blockManager.hasMinStorage(b)) {
numRemovedSafe++;
}
}
@@ -3368,7 +3367,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
curBlock = blocks[nrCompleteBlocks];
if(!curBlock.isComplete())
break;
- assert blockManager.checkMinReplication(curBlock) :
+ assert blockManager.hasMinStorage(curBlock) :
"A COMPLETE block is not minimally replicated in " + src;
}
@@ -3404,7 +3403,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// If penultimate block doesn't exist then its minReplication is met
boolean penultimateBlockMinReplication = penultimateBlock == null ? true :
- blockManager.checkMinReplication(penultimateBlock);
+ blockManager.hasMinStorage(penultimateBlock);
switch(lastBlockState) {
case COMPLETE:
@@ -3413,7 +3412,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
case COMMITTED:
// Close file if committed blocks are minimally replicated
if(penultimateBlockMinReplication &&
- blockManager.checkMinReplication(lastBlock)) {
+ blockManager.hasMinStorage(lastBlock)) {
finalizeINodeFileUnderConstruction(src, pendingFile,
iip.getLatestSnapshotId());
NameNode.stateChangeLog.warn("BLOCK*"
@@ -3705,9 +3704,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
if (storageInfo != null) {
if(copyTruncate) {
- storageInfo.addBlock(truncatedBlock);
+ storageInfo.addBlock(truncatedBlock, truncatedBlock);
} else {
- storageInfo.addBlock(storedBlock);
+ storageInfo.addBlock(storedBlock, storedBlock);
}
}
}
@@ -3723,8 +3722,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} else {
iFile.setLastBlock(storedBlock, trimmedStorageInfos);
if (closeFile) {
- blockManager.markBlockReplicasAsCorrupt(storedBlock,
- oldGenerationStamp, oldNumBytes, trimmedStorageInfos);
+ blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(),
+ storedBlock, oldGenerationStamp, oldNumBytes,
+ trimmedStorageInfos);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 7d4cd7e..ab179b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -647,7 +647,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
.getStorageType()));
}
if (showReplicaDetails) {
- LightWeightLinkedSet<Block> blocksExcess =
+ LightWeightLinkedSet<BlockInfo> blocksExcess =
bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
Collection<DatanodeDescriptor> corruptReplicas =
bm.getCorruptReplicas(block.getLocalBlock());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index 5126aa7..bae4f1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -63,7 +63,7 @@ public class TestBlockInfo {
final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
- boolean added = blockInfo.addStorage(storage);
+ boolean added = blockInfo.addStorage(storage, blockInfo);
Assert.assertTrue(added);
Assert.assertEquals(storage, blockInfo.getStorageInfo(0));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 396dff3..9e31670 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -383,7 +383,7 @@ public class TestBlockManager {
for (int i = 1; i < pipeline.length; i++) {
DatanodeStorageInfo storage = pipeline[i];
bm.addBlock(storage, blockInfo, null);
- blockInfo.addStorage(storage);
+ blockInfo.addStorage(storage, blockInfo);
}
}
@@ -393,7 +393,7 @@ public class TestBlockManager {
for (DatanodeDescriptor dn : nodes) {
for (DatanodeStorageInfo storage : dn.getStorageInfos()) {
- blockInfo.addStorage(storage);
+ blockInfo.addStorage(storage, blockInfo);
}
}
return blockInfo;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
index 1c3f075..c33667d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
@@ -100,7 +100,7 @@ public class TestNodeCount {
DatanodeDescriptor nonExcessDN = null;
for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
- Collection<Block> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
+ Collection<BlockInfo> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
nonExcessDN = dn;
break;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
index 2d7bb44..83b3aa0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -42,7 +41,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.util.Time;
import org.junit.Test;
public class TestOverReplicatedBlocks {
@@ -185,7 +183,7 @@ public class TestOverReplicatedBlocks {
// All replicas for deletion should be scheduled on lastDN.
// And should not actually be deleted, because lastDN does not heartbeat.
namesystem.readLock();
- Collection<Block> dnBlocks =
+ Collection<BlockInfo> dnBlocks =
namesystem.getBlockManager().excessReplicateMap.get(lastDNid);
assertEquals("Replicas on node " + lastDNid + " should have been deleted",
SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dec68e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 2812957..44f0e65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -1250,7 +1250,7 @@ public class TestReplicationPolicy {
when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
when(storage.addBlock(any(BlockInfo.class))).thenReturn
(DatanodeStorageInfo.AddBlockResult.ADDED);
- ucBlock.addStorage(storage);
+ ucBlock.addStorage(storage, ucBlock);
when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
.thenReturn(ucBlock);