You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/30 17:42:14 UTC
[14/50] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 6c6d758,1346ab3..8232ab9
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@@ -674,8 -648,8 +674,8 @@@ public class BlockManager implements Bl
return false; // already completed (e.g. by syncBlock)
final boolean b = commitBlock(lastBlock, commitBlock);
- if (hasMinStorage(lastBlock)) {
- completeBlock(bc, bc.numBlocks() - 1, false);
- if (countNodes(lastBlock).liveReplicas() >= minReplication) {
++ if (hasMinStorage(lastBlock)) {
+ completeBlock(lastBlock, false);
}
return b;
}
@@@ -698,9 -666,9 +692,9 @@@
}
int numNodes = curBlock.numNodes();
- if (!force && numNodes < minReplication) {
+ if (!force && !hasMinStorage(curBlock, numNodes)) {
- throw new IOException("Cannot complete block: " +
- "block does not satisfy minimal replication requirement.");
+ throw new IOException("Cannot complete block: "
+ + "block does not satisfy minimal replication requirement.");
}
if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
throw new IOException(
@@@ -718,26 -683,10 +709,12 @@@
// a "forced" completion when a file is getting closed by an
// OP_CLOSE edit on the standby).
namesystem.adjustSafeModeBlockTotals(0, 1);
+ final int minStorage = curBlock.isStriped() ?
+ ((BlockInfoStriped) curBlock).getRealDataBlockNum() : minReplication;
namesystem.incrementSafeBlockCount(
- Math.min(numNodes, minReplication));
+ Math.min(numNodes, minStorage), curBlock);
-
- // replace block in the blocksMap
- return blocksMap.replaceBlock(completeBlock);
}
- private BlockInfo completeBlock(final BlockCollection bc,
- final BlockInfo block, boolean force) throws IOException {
- BlockInfo[] fileBlocks = bc.getBlocks();
- for (int idx = 0; idx < fileBlocks.length; idx++) {
- if (fileBlocks[idx] == block) {
- return completeBlock(bc, idx, force);
- }
- }
- return block;
- }
-
/**
* Force the given block in the given file to be marked as complete,
* regardless of whether enough replicas are present. This is necessary
@@@ -1270,37 -1162,29 +1245,36 @@@
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
DatanodeStorageInfo storageInfo,
DatanodeDescriptor node) throws IOException {
--
- if (b.stored.isDeleted()) {
- if (b.getCorrupted().isDeleted()) {
++ if (b.getStored().isDeleted()) {
blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
" corrupt as it does not belong to any file", b);
- addToInvalidates(b.corrupted, node);
+ addToInvalidates(b.getCorrupted(), node);
return;
- }
- short expectedReplicas = b.getCorrupted().getReplication();
+ }
+ short expectedReplicas =
- getExpectedReplicaNum(b.stored);
++ getExpectedReplicaNum(b.getStored());
// Add replica to the data-node if it is not already there
if (storageInfo != null) {
- storageInfo.addBlock(b.stored, b.corrupted);
- storageInfo.addBlock(b.getStored());
++ storageInfo.addBlock(b.getStored(), b.getCorrupted());
}
- // Add this replica to corruptReplicas Map
- corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node,
- b.getReason(), b.getReasonCode());
+ // Add this replica to corruptReplicas Map. For striped blocks, we always
+ // use the id of whole striped block group when adding to corruptReplicas
- Block corrupted = new Block(b.corrupted);
- if (b.stored.isStriped()) {
- corrupted.setBlockId(b.stored.getBlockId());
++ Block corrupted = new Block(b.getCorrupted());
++ if (b.getStored().isStriped()) {
++ corrupted.setBlockId(b.getStored().getBlockId());
+ }
- corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.reason,
- b.reasonCode);
++ corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.getReason(),
++ b.getReasonCode());
- NumberReplicas numberOfReplicas = countNodes(b.stored);
+ NumberReplicas numberOfReplicas = countNodes(b.getStored());
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
expectedReplicas;
- boolean minReplicationSatisfied =
- numberOfReplicas.liveReplicas() >= minReplication;
+
- boolean minReplicationSatisfied = hasMinStorage(b.stored,
++ boolean minReplicationSatisfied = hasMinStorage(b.getStored(),
+ numberOfReplicas.liveReplicas());
+
boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
expectedReplicas;
@@@ -1315,10 -1199,10 +1289,10 @@@
if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
|| corruptedDuringWrite) {
// the block is over-replicated so invalidate the replicas immediately
- invalidateBlock(b, node);
+ invalidateBlock(b, node, numberOfReplicas);
} else if (namesystem.isPopulatingReplQueues()) {
// add the block to neededReplication
- updateNeededReplications(b.stored, -1, 0);
+ updateNeededReplications(b.getStored(), -1, 0);
}
}
@@@ -1342,13 -1227,12 +1316,13 @@@
"invalidation of {} on {} because {} replica(s) are located on " +
"nodes with potentially out-of-date block reports", b, dn,
nr.replicasOnStaleNodes());
- postponeBlock(b.corrupted);
+ postponeBlock(b.getCorrupted());
return false;
- } else if (nr.liveReplicas() >= 1) {
- // If we have at least one copy on a live node, then we can delete it.
+ } else {
+ // we already checked the number of replicas in the caller of this
+ // function and know there are enough live replicas, so we can delete it.
- addToInvalidates(b.corrupted, dn);
- removeStoredBlock(b.stored, node);
+ addToInvalidates(b.getCorrupted(), dn);
+ removeStoredBlock(b.getStored(), node);
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
b, dn);
return true;
@@@ -1446,72 -1326,11 +1420,11 @@@
namesystem.writeLock();
try {
synchronized (neededReplications) {
- for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
- for (BlockInfo block : blocksToReplicate.get(priority)) {
- ReplicationWork rw = scheduleReplication(block, priority);
+ for (int priority = 0; priority < blocksToRecover.size(); priority++) {
+ for (BlockInfo block : blocksToRecover.get(priority)) {
- // block should belong to a file
- bc = getBlockCollection(block);
- // abandoned block or block reopened for append
- if (bc == null
- || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
- // remove from neededReplications
- neededReplications.remove(block, priority);
- continue;
- }
-
- requiredReplication = getExpectedReplicaNum(block);
-
- // get a source data-node
- containingNodes = new ArrayList<>();
- List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
- NumberReplicas numReplicas = new NumberReplicas();
- List<Short> liveBlockIndices = new ArrayList<>();
- final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
- containingNodes, liveReplicaNodes, numReplicas,
- liveBlockIndices, priority);
- if(srcNodes == null || srcNodes.length == 0) {
- // block can not be replicated from any node
- LOG.debug("Block " + block + " cannot be recovered " +
- "from any node");
- continue;
- }
-
- // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
- // not included in the numReplicas.liveReplicas() count
- assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
-
- // do not schedule more if enough replicas is already pending
- numEffectiveReplicas = numReplicas.liveReplicas() +
- pendingReplications.getNumReplicas(block);
-
- if (numEffectiveReplicas >= requiredReplication) {
- if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block, requiredReplication)) ) {
- neededReplications.remove(block, priority); // remove from neededReplications
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
- " it has enough replicas", block);
- continue;
- }
- }
-
- if (numReplicas.liveReplicas() < requiredReplication) {
- additionalReplRequired = requiredReplication
- - numEffectiveReplicas;
- } else {
- additionalReplRequired = 1; // Needed on a new rack
- }
- if (block.isStriped()) {
- short[] indices = new short[liveBlockIndices.size()];
- for (int i = 0 ; i < liveBlockIndices.size(); i++) {
- indices[i] = liveBlockIndices.get(i);
- }
- ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes,
- containingNodes, liveReplicaNodes, additionalReplRequired,
- priority, indices);
- recovWork.add(ecw);
- } else {
- recovWork.add(new ReplicationWork(block, bc, srcNodes,
- containingNodes, liveReplicaNodes, additionalReplRequired,
- priority));
++ BlockRecoveryWork rw = scheduleRecovery(block, priority);
+ if (rw != null) {
- work.add(rw);
++ recovWork.add(rw);
}
}
}
@@@ -1520,9 -1339,8 +1433,9 @@@
namesystem.writeUnlock();
}
+ // Step 2: choose target nodes for each recovery task
- final Set<Node> excludedNodes = new HashSet<Node>();
+ final Set<Node> excludedNodes = new HashSet<>();
- for(ReplicationWork rw : work){
+ for(BlockRecoveryWork rw : recovWork){
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
excludedNodes.clear();
@@@ -1533,101 -1351,21 +1446,24 @@@
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
// It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the block collection itself.
- rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
+ final BlockPlacementPolicy placementPolicy =
- placementPolicies.getPolicy(rw.block.isStriped());
++ placementPolicies.getPolicy(rw.getBlock().isStriped());
+ rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
}
+ // Step 3: add tasks to the DN
namesystem.writeLock();
try {
- for(ReplicationWork rw : work){
+ for(BlockRecoveryWork rw : recovWork){
- final DatanodeStorageInfo[] targets = rw.targets;
+ final DatanodeStorageInfo[] targets = rw.getTargets();
if(targets == null || targets.length == 0){
- rw.targets = null;
+ rw.resetTargets();
continue;
}
synchronized (neededReplications) {
- BlockInfo block = rw.block;
- int priority = rw.priority;
- // Recheck since global lock was released
- // block should belong to a file
- bc = getBlockCollection(block);
- // abandoned block or block reopened for append
- if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
- neededReplications.remove(block, priority); // remove from neededReplications
- rw.targets = null;
- continue;
- }
- requiredReplication = getExpectedReplicaNum(block);
-
- // do not schedule more if enough replicas is already pending
- NumberReplicas numReplicas = countNodes(block);
- numEffectiveReplicas = numReplicas.liveReplicas() +
- pendingReplications.getNumReplicas(block);
-
- if (numEffectiveReplicas >= requiredReplication) {
- if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block, requiredReplication)) ) {
- neededReplications.remove(block, priority); // remove from neededReplications
- rw.targets = null;
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
- " it has enough replicas", block);
- continue;
- }
- }
-
- if ( (numReplicas.liveReplicas() >= requiredReplication) &&
- (!blockHasEnoughRacks(block, requiredReplication)) ) {
- if (rw.srcNodes[0].getNetworkLocation().equals(
- targets[0].getDatanodeDescriptor().getNetworkLocation())) {
- //No use continuing, unless a new rack in this case
- continue;
- }
- }
-
- // Add block to the to be replicated list
- if (block.isStriped()) {
- assert rw instanceof ErasureCodingWork;
- assert rw.targets.length > 0;
- String src = getBlockCollection(block).getName();
- ErasureCodingZone ecZone = null;
- try {
- ecZone = namesystem.getErasureCodingZoneForPath(src);
- } catch (IOException e) {
- blockLog
- .warn("Failed to get the EC zone for the file {} ", src);
- }
- if (ecZone == null) {
- blockLog.warn("No erasure coding policy found for the file {}. "
- + "So cannot proceed for recovery", src);
- // TODO: we may have to revisit later for what we can do better to
- // handle this case.
- continue;
- }
- rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
- new ExtendedBlock(namesystem.getBlockPoolId(), block),
- rw.srcNodes, rw.targets,
- ((ErasureCodingWork) rw).liveBlockIndicies,
- ecZone.getErasureCodingPolicy());
- } else {
- rw.srcNodes[0].addBlockToBeReplicated(block, targets);
- }
- scheduledWork++;
- DatanodeStorageInfo.incrementBlocksScheduled(targets);
-
- // Move the block-replication into a "pending" state.
- // The reason we use 'pending' is so we can retry
- // replications that fail after an appropriate amount of time.
- pendingReplications.increment(block,
- DatanodeStorageInfo.toDatanodeDescriptors(targets));
- blockLog.debug("BLOCK* block {} is moved from neededReplications to "
- + "pendingReplications", block);
-
- // remove from neededReplications
- if(numEffectiveReplicas + targets.length >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- if (validateReplicationWork(rw)) {
++ if (validateRecoveryWork(rw)) {
+ scheduledWork++;
}
}
}
@@@ -1637,16 -1375,16 +1473,16 @@@
if (blockLog.isInfoEnabled()) {
// log which blocks have been scheduled for replication
- for(ReplicationWork rw : work){
+ for(BlockRecoveryWork rw : recovWork){
- DatanodeStorageInfo[] targets = rw.targets;
+ DatanodeStorageInfo[] targets = rw.getTargets();
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
for (DatanodeStorageInfo target : targets) {
targetList.append(' ');
targetList.append(target.getDatanodeDescriptor());
}
- blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
- rw.block, targetList);
- blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(),
++ blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNodes(),
+ rw.getBlock(), targetList);
}
}
}
@@@ -1658,6 -1396,118 +1494,160 @@@
return scheduledWork;
}
+ boolean hasEnoughEffectiveReplicas(BlockInfo block,
+ NumberReplicas numReplicas, int pendingReplicaNum, int required) {
+ int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
+ return (numEffectiveReplicas >= required) &&
- (pendingReplicaNum > 0 || blockHasEnoughRacks(block));
++ (pendingReplicaNum > 0 || blockHasEnoughRacks(block, required));
+ }
+
- private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
++ private BlockRecoveryWork scheduleRecovery(BlockInfo block, int priority) {
+ // block should belong to a file
+ BlockCollection bc = getBlockCollection(block);
+ // abandoned block or block reopened for append
+ if (bc == null
+ || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+ // remove from neededReplications
+ neededReplications.remove(block, priority);
+ return null;
+ }
+
+ short requiredReplication = getExpectedReplicaNum(block);
+
+ // get a source data-node
+ List<DatanodeDescriptor> containingNodes = new ArrayList<>();
+ List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
+ NumberReplicas numReplicas = new NumberReplicas();
- DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes,
- liveReplicaNodes, numReplicas, priority);
- if (srcNode == null) { // block can not be replicated from any node
- LOG.debug("Block " + block + " cannot be repl from any node");
++ List<Short> liveBlockIndices = new ArrayList<>();
++ final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
++ containingNodes, liveReplicaNodes, numReplicas,
++ liveBlockIndices, priority);
++ if(srcNodes == null || srcNodes.length == 0) {
++ // block can not be recovered from any node
++ LOG.debug("Block " + block + " cannot be recovered " +
++ "from any node");
+ return null;
+ }
+
+ // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
+ // not included in the numReplicas.liveReplicas() count
+ assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
+
+ int pendingNum = pendingReplications.getNumReplicas(block);
+ if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
+ requiredReplication)) {
+ neededReplications.remove(block, priority);
+ blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+ " it has enough replicas", block);
+ return null;
+ }
+
+ final int additionalReplRequired;
+ if (numReplicas.liveReplicas() < requiredReplication) {
+ additionalReplRequired = requiredReplication - numReplicas.liveReplicas()
+ - pendingNum;
+ } else {
+ additionalReplRequired = 1; // Needed on a new rack
+ }
- return new ReplicationWork(block, bc, srcNode, containingNodes,
- liveReplicaNodes, additionalReplRequired, priority);
++
++ if (block.isStriped()) {
++ short[] indices = new short[liveBlockIndices.size()];
++ for (int i = 0 ; i < liveBlockIndices.size(); i++) {
++ indices[i] = liveBlockIndices.get(i);
++ }
++ return new ErasureCodingWork(block, bc, srcNodes,
++ containingNodes, liveReplicaNodes, additionalReplRequired,
++ priority, indices);
++ } else {
++ return new ReplicationWork(block, bc, srcNodes,
++ containingNodes, liveReplicaNodes, additionalReplRequired,
++ priority);
++ }
+ }
+
- private boolean validateReplicationWork(ReplicationWork rw) {
++ private boolean validateRecoveryWork(BlockRecoveryWork rw) {
+ BlockInfo block = rw.getBlock();
+ int priority = rw.getPriority();
+ // Recheck since global lock was released
+ // block should belong to a file
+ BlockCollection bc = getBlockCollection(block);
+ // abandoned block or block reopened for append
+ if (bc == null
+ || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+ neededReplications.remove(block, priority);
+ rw.resetTargets();
+ return false;
+ }
+
+ // do not schedule more if enough replicas is already pending
+ final short requiredReplication = getExpectedReplicaNum(block);
+ NumberReplicas numReplicas = countNodes(block);
+ final int pendingNum = pendingReplications.getNumReplicas(block);
+ if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
+ requiredReplication)) {
+ neededReplications.remove(block, priority);
+ rw.resetTargets();
+ blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+ " it has enough replicas", block);
+ return false;
+ }
+
+ DatanodeStorageInfo[] targets = rw.getTargets();
+ if ( (numReplicas.liveReplicas() >= requiredReplication) &&
- (!blockHasEnoughRacks(block)) ) {
- if (rw.getSrcNode().getNetworkLocation().equals(
++ (!blockHasEnoughRacks(block, requiredReplication)) ) {
++ if (rw.getSrcNodes()[0].getNetworkLocation().equals(
+ targets[0].getDatanodeDescriptor().getNetworkLocation())) {
+ //No use continuing, unless a new rack in this case
+ return false;
+ }
+ }
+
- // Add block to the to be replicated list
- rw.getSrcNode().addBlockToBeReplicated(block, targets);
++ // Add block to the to be recovered list
++ if (block.isStriped()) {
++ assert rw instanceof ErasureCodingWork;
++ assert rw.getTargets().length > 0;
++ String src = getBlockCollection(block).getName();
++ ErasureCodingZone ecZone = null;
++ try {
++ ecZone = namesystem.getErasureCodingZoneForPath(src);
++ } catch (IOException e) {
++ blockLog
++ .warn("Failed to get the EC zone for the file {} ", src);
++ }
++ if (ecZone == null) {
++ blockLog.warn("No erasure coding policy found for the file {}. "
++ + "So cannot proceed for recovery", src);
++ // TODO: we may have to revisit later for what we can do better to
++ // handle this case.
++ return false;
++ }
++ rw.getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
++ new ExtendedBlock(namesystem.getBlockPoolId(), block),
++ rw.getSrcNodes(), rw.getTargets(),
++ ((ErasureCodingWork) rw).getLiveBlockIndicies(),
++ ecZone.getErasureCodingPolicy());
++ } else {
++ rw.getSrcNodes()[0].addBlockToBeReplicated(block, targets);
++ }
++
+ DatanodeStorageInfo.incrementBlocksScheduled(targets);
+
+ // Move the block-replication into a "pending" state.
+ // The reason we use 'pending' is so we can retry
+ // replications that fail after an appropriate amount of time.
+ pendingReplications.increment(block,
+ DatanodeStorageInfo.toDatanodeDescriptors(targets));
+ blockLog.debug("BLOCK* block {} is moved from neededReplications to "
+ + "pendingReplications", block);
+
+ int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
+ // remove from neededReplications
+ if(numEffectiveReplicas + targets.length >= requiredReplication) {
+ neededReplications.remove(block, priority);
+ }
+ return true;
+ }
+
/** Choose target for WebHDFS redirection. */
public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
@@@ -1916,59 -1755,7 +1906,17 @@@
}
}
+ private static class BlockInfoToAdd {
+ final BlockInfo stored;
+ final Block reported;
+
+ BlockInfoToAdd(BlockInfo stored, Block reported) {
+ this.stored = stored;
+ this.reported = reported;
+ }
+ }
+
/**
- * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
- * list of blocks that should be considered corrupt due to a block report.
- */
- private static class BlockToMarkCorrupt {
- /**
- * The corrupted block in a datanode. This is the one reported by the
- * datanode.
- */
- final Block corrupted;
- /** The corresponding block stored in the BlockManager. */
- final BlockInfo stored;
- /** The reason to mark corrupt. */
- final String reason;
- /** The reason code to be stored */
- final Reason reasonCode;
-
- BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason,
- Reason reasonCode) {
- Preconditions.checkNotNull(corrupted, "corrupted is null");
- Preconditions.checkNotNull(stored, "stored is null");
-
- this.corrupted = corrupted;
- this.stored = stored;
- this.reason = reason;
- this.reasonCode = reasonCode;
- }
-
- BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs,
- String reason, Reason reasonCode) {
- this(corrupted, stored, reason, reasonCode);
- //the corrupted block in datanode has a different generation stamp
- corrupted.setGenerationStamp(gs);
- }
-
- @Override
- public String toString() {
- return corrupted + "("
- + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
- }
- }
-
- /**
* The given storage is reporting all its blocks.
* Update the (storage-->block list) and (block-->storage list) maps.
*
@@@ -2721,8 -2484,8 +2669,8 @@@
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
- && numCurrentReplica >= minReplication) {
+ && hasMinStorage(storedBlock, numCurrentReplica)) {
- completeBlock(getBlockCollection(storedBlock), storedBlock, false);
+ completeBlock(storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
// only complete blocks are counted towards that.
@@@ -2796,8 -2558,8 +2744,8 @@@
+ pendingReplications.getNumReplicas(storedBlock);
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
- numLiveReplicas >= minReplication) {
+ hasMinStorage(storedBlock, numLiveReplicas)) {
- storedBlock = completeBlock(bc, storedBlock, false);
+ completeBlock(storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
@@@ -4171,138 -3740,7 +4119,32 @@@
null);
}
+ public static LocatedStripedBlock newLocatedStripedBlock(
+ ExtendedBlock b, DatanodeStorageInfo[] storages,
+ int[] indices, long startOffset, boolean corrupt) {
+ // startOffset is unknown
+ return new LocatedStripedBlock(
+ b, DatanodeStorageInfo.toDatanodeInfos(storages),
+ DatanodeStorageInfo.toStorageIDs(storages),
+ DatanodeStorageInfo.toStorageTypes(storages),
+ indices, startOffset, corrupt,
+ null);
+ }
+
+ public static LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info,
+ DatanodeStorageInfo[] locs, long offset) throws IOException {
+ final LocatedBlock lb;
+ if (info.isStriped()) {
+ lb = newLocatedStripedBlock(eb, locs,
+ info.getUnderConstructionFeature().getBlockIndices(),
+ offset, false);
+ } else {
+ lb = newLocatedBlock(eb, locs, offset, false);
+ }
+ return lb;
+ }
+
/**
- * This class is used internally by {@link this#computeRecoveryWorkForBlocks}
- * to represent a task to recover a block through replication or erasure
- * coding. Recovery is done by transferring data from srcNodes to targets
- */
- private abstract static class BlockRecoveryWork {
- final BlockInfo block;
- final BlockCollection bc;
-
- /**
- * An erasure coding recovery task has multiple source nodes.
- * A replication task only has 1 source node, stored on top of the array
- */
- final DatanodeDescriptor[] srcNodes;
- /** Nodes containing the block; avoid them in choosing new targets */
- final List<DatanodeDescriptor> containingNodes;
- /** Required by {@link BlockPlacementPolicy#chooseTarget} */
- final List<DatanodeStorageInfo> liveReplicaStorages;
- final int additionalReplRequired;
-
- DatanodeStorageInfo[] targets;
- final int priority;
-
- BlockRecoveryWork(BlockInfo block,
- BlockCollection bc,
- DatanodeDescriptor[] srcNodes,
- List<DatanodeDescriptor> containingNodes,
- List<DatanodeStorageInfo> liveReplicaStorages,
- int additionalReplRequired,
- int priority) {
- this.block = block;
- this.bc = bc;
- this.srcNodes = srcNodes;
- this.containingNodes = containingNodes;
- this.liveReplicaStorages = liveReplicaStorages;
- this.additionalReplRequired = additionalReplRequired;
- this.priority = priority;
- this.targets = null;
- }
-
- abstract void chooseTargets(BlockPlacementPolicy blockplacement,
- BlockStoragePolicySuite storagePolicySuite,
- Set<Node> excludedNodes);
- }
-
- private static class ReplicationWork extends BlockRecoveryWork {
- ReplicationWork(BlockInfo block,
- BlockCollection bc,
- DatanodeDescriptor[] srcNodes,
- List<DatanodeDescriptor> containingNodes,
- List<DatanodeStorageInfo> liveReplicaStorages,
- int additionalReplRequired,
- int priority) {
- super(block, bc, srcNodes, containingNodes,
- liveReplicaStorages, additionalReplRequired, priority);
- LOG.debug("Creating a ReplicationWork to recover " + block);
- }
-
- @Override
- void chooseTargets(BlockPlacementPolicy blockplacement,
- BlockStoragePolicySuite storagePolicySuite,
- Set<Node> excludedNodes) {
- assert srcNodes.length > 0
- : "At least 1 source node should have been selected";
- try {
- targets = blockplacement.chooseTarget(bc.getName(),
- additionalReplRequired, srcNodes[0], liveReplicaStorages, false,
- excludedNodes, block.getNumBytes(),
- storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
- } finally {
- srcNodes[0].decrementPendingReplicationWithoutTargets();
- }
- }
- }
-
- private static class ErasureCodingWork extends BlockRecoveryWork {
- final short[] liveBlockIndicies;
-
- ErasureCodingWork(BlockInfo block,
- BlockCollection bc,
- DatanodeDescriptor[] srcNodes,
- List<DatanodeDescriptor> containingNodes,
- List<DatanodeStorageInfo> liveReplicaStorages,
- int additionalReplRequired,
- int priority, short[] liveBlockIndicies) {
- super(block, bc, srcNodes, containingNodes,
- liveReplicaStorages, additionalReplRequired, priority);
- this.liveBlockIndicies = liveBlockIndicies;
- LOG.debug("Creating an ErasureCodingWork to recover " + block);
- }
-
- @Override
- void chooseTargets(BlockPlacementPolicy blockplacement,
- BlockStoragePolicySuite storagePolicySuite,
- Set<Node> excludedNodes) {
- try {
- // TODO: new placement policy for EC considering multiple writers
- targets = blockplacement.chooseTarget(bc.getName(),
- additionalReplRequired, srcNodes[0], liveReplicaStorages, false,
- excludedNodes, block.getNumBytes(),
- storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
- } finally {
- }
- }
- }
-
- /**
* A simple result enum for the result of
* {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockRecoveryWork.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockRecoveryWork.java
index 0000000,0000000..ed546df
new file mode 100644
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockRecoveryWork.java
@@@ -1,0 -1,0 +1,111 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs.server.blockmanagement;
++
++import org.apache.hadoop.net.Node;
++
++import java.util.Collections;
++import java.util.List;
++import java.util.Set;
++
++/**
++ * This class is used internally by
++ * {@link BlockManager#computeRecoveryWorkForBlocks} to represent a task to
++ * recover a block through replication or erasure coding. Recovery is done by
++ * transferring data from srcNodes to targets
++ */
++abstract class BlockRecoveryWork {
++ private final BlockInfo block;
++
++ private final BlockCollection bc;
++
++ /**
++ * An erasure coding recovery task has multiple source nodes.
++ * A replication task only has 1 source node, stored on top of the array
++ */
++ private final DatanodeDescriptor[] srcNodes;
++ /** Nodes containing the block; avoid them in choosing new targets */
++ private final List<DatanodeDescriptor> containingNodes;
++ /** Required by {@link BlockPlacementPolicy#chooseTarget} */
++ private final List<DatanodeStorageInfo> liveReplicaStorages;
++ private final int additionalReplRequired;
++
++ private DatanodeStorageInfo[] targets;
++ private final int priority;
++
++ public BlockRecoveryWork(BlockInfo block,
++ BlockCollection bc,
++ DatanodeDescriptor[] srcNodes,
++ List<DatanodeDescriptor> containingNodes,
++ List<DatanodeStorageInfo> liveReplicaStorages,
++ int additionalReplRequired,
++ int priority) {
++ this.block = block;
++ this.bc = bc;
++ this.srcNodes = srcNodes;
++ this.containingNodes = containingNodes;
++ this.liveReplicaStorages = liveReplicaStorages;
++ this.additionalReplRequired = additionalReplRequired;
++ this.priority = priority;
++ this.targets = null;
++ }
++
++ DatanodeStorageInfo[] getTargets() {
++ return targets;
++ }
++
++ void resetTargets() {
++ this.targets = null;
++ }
++
++ void setTargets(DatanodeStorageInfo[] targets) {
++ this.targets = targets;
++ }
++
++ List<DatanodeDescriptor> getContainingNodes() {
++ return Collections.unmodifiableList(containingNodes);
++ }
++
++ public int getPriority() {
++ return priority;
++ }
++
++ public BlockInfo getBlock() {
++ return block;
++ }
++
++ public DatanodeDescriptor[] getSrcNodes() {
++ return srcNodes;
++ }
++
++ BlockCollection getBc() {
++ return bc;
++ }
++
++ List<DatanodeStorageInfo> getLiveReplicaStorages() {
++ return liveReplicaStorages;
++ }
++
++ public int getAdditionalReplRequired() {
++ return additionalReplRequired;
++ }
++
++ abstract void chooseTargets(BlockPlacementPolicy blockplacement,
++ BlockStoragePolicySuite storagePolicySuite,
++ Set<Node> excludedNodes);
++}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
index 0000000,3842e56..a871390
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
@@@ -1,0 -1,87 +1,82 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs.server.blockmanagement;
+
+ import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
+
+ import com.google.common.base.Preconditions;
++import org.apache.hadoop.hdfs.protocol.Block;
+
+ /**
+ * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+ * list of blocks that should be considered corrupt due to a block report.
+ */
+ class BlockToMarkCorrupt {
+ /** The corrupted block in a datanode. */
- private final BlockInfo corrupted;
++ private final Block corrupted;
+ /** The corresponding block stored in the BlockManager. */
+ private final BlockInfo stored;
+ /** The reason to mark corrupt. */
+ private final String reason;
+ /** The reason code to be stored */
+ private final CorruptReplicasMap.Reason reasonCode;
+
- BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
++ BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason,
+ CorruptReplicasMap.Reason reasonCode) {
+ Preconditions.checkNotNull(corrupted, "corrupted is null");
+ Preconditions.checkNotNull(stored, "stored is null");
+
+ this.corrupted = corrupted;
+ this.stored = stored;
+ this.reason = reason;
+ this.reasonCode = reasonCode;
+ }
+
- BlockToMarkCorrupt(BlockInfo stored, String reason,
++ BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs, String reason,
+ CorruptReplicasMap.Reason reasonCode) {
- this(stored, stored, reason, reasonCode);
- }
-
- BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
- CorruptReplicasMap.Reason reasonCode) {
- this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
- reason, reasonCode);
++ this(corrupted, stored, reason, reasonCode);
+ //the corrupted block in datanode has a different generation stamp
- corrupted.setGenerationStamp(gs);
++ this.corrupted.setGenerationStamp(gs);
+ }
+
+ public boolean isCorruptedDuringWrite() {
+ return stored.getGenerationStamp() > corrupted.getGenerationStamp();
+ }
+
- public BlockInfo getCorrupted() {
++ public Block getCorrupted() {
+ return corrupted;
+ }
+
+ public BlockInfo getStored() {
+ return stored;
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ public Reason getReasonCode() {
+ return reasonCode;
+ }
+
+ @Override
+ public String toString() {
+ return corrupted + "("
+ + (corrupted == stored ? "same as stored": "stored=" + stored) + ")";
+ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 29e541c,0b398c5..b258f06
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@@ -38,9 -39,8 +39,10 @@@ import org.apache.hadoop.fs.StorageType
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
@@@ -696,29 -663,32 +698,39 @@@ public class DatanodeDescriptor extend
}
}
+ @VisibleForTesting
+ public boolean containsInvalidateBlock(Block block) {
+ synchronized (invalidateBlocks) {
+ return invalidateBlocks.contains(block);
+ }
+ }
+
/**
- * Return the sum of remaining spaces of the specified type. If the remaining
- * space of a storage is less than minSize, it won't be counted toward the
- * sum.
+ * Find whether the datanode contains good storage of given type to
+ * place block of size <code>blockSize</code>.
+ *
+ * <p>Currently datanode only cares about the storage type, in this
+ * method, the first storage of given type we see is returned.
*
- * @param t The storage type. If null, the type is ignored.
- * @param minSize The minimum free space required.
- * @return the sum of remaining spaces that are bigger than minSize.
+ * @param t requested storage type
+ * @param blockSize requested block size
+ * @return
*/
- public long getRemaining(StorageType t, long minSize) {
+ public DatanodeStorageInfo chooseStorage4Block(StorageType t,
+ long blockSize) {
+ final long requiredSize =
+ blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
+ final long scheduledSize = blockSize * getBlocksScheduled(t);
long remaining = 0;
+ DatanodeStorageInfo storage = null;
for (DatanodeStorageInfo s : getStorageInfos()) {
if (s.getState() == State.NORMAL &&
- (t == null || s.getStorageType() == t)) {
+ s.getStorageType() == t) {
+ if (storage == null) {
+ storage = s;
+ }
long r = s.getRemaining();
- if (r >= minSize) {
+ if (r >= requiredSize) {
remaining += r;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
index 0000000,0000000..761d6d0
new file mode 100644
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
@@@ -1,0 -1,0 +1,60 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs.server.blockmanagement;
++
++import org.apache.hadoop.net.Node;
++
++import java.util.List;
++import java.util.Set;
++
++class ErasureCodingWork extends BlockRecoveryWork {
++ private final short[] liveBlockIndicies;
++
++ public ErasureCodingWork(BlockInfo block,
++ BlockCollection bc,
++ DatanodeDescriptor[] srcNodes,
++ List<DatanodeDescriptor> containingNodes,
++ List<DatanodeStorageInfo> liveReplicaStorages,
++ int additionalReplRequired,
++ int priority, short[] liveBlockIndicies) {
++ super(block, bc, srcNodes, containingNodes,
++ liveReplicaStorages, additionalReplRequired, priority);
++ this.liveBlockIndicies = liveBlockIndicies;
++ BlockManager.LOG.debug("Creating an ErasureCodingWork to recover " + block);
++ }
++
++ short[] getLiveBlockIndicies() {
++ return liveBlockIndicies;
++ }
++
++ @Override
++ void chooseTargets(BlockPlacementPolicy blockplacement,
++ BlockStoragePolicySuite storagePolicySuite,
++ Set<Node> excludedNodes) {
++ try {
++ // TODO: new placement policy for EC considering multiple writers
++ DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget(
++ getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
++ getLiveReplicaStorages(), false, excludedNodes,
++ getBlock().getNumBytes(),
++ storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
++ setTargets(chosenTargets);
++ } finally {
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
index 0000000,f8a6dad..8266f45
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
@@@ -1,0 -1,87 +1,53 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs.server.blockmanagement;
+
+ import org.apache.hadoop.net.Node;
+
+ import java.util.Collections;
+ import java.util.List;
+ import java.util.Set;
+
-class ReplicationWork {
- private final BlockInfo block;
- private final BlockCollection bc;
- private final DatanodeDescriptor srcNode;
- private final int additionalReplRequired;
- private final int priority;
- private final List<DatanodeDescriptor> containingNodes;
- private final List<DatanodeStorageInfo> liveReplicaStorages;
- private DatanodeStorageInfo[] targets;
-
++class ReplicationWork extends BlockRecoveryWork {
+ public ReplicationWork(BlockInfo block, BlockCollection bc,
- DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes,
++ DatanodeDescriptor[] srcNodes, List<DatanodeDescriptor> containingNodes,
+ List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired,
+ int priority) {
- this.block = block;
- this.bc = bc;
- this.srcNode = srcNode;
- this.srcNode.incrementPendingReplicationWithoutTargets();
- this.containingNodes = containingNodes;
- this.liveReplicaStorages = liveReplicaStorages;
- this.additionalReplRequired = additionalReplRequired;
- this.priority = priority;
- this.targets = null;
++ super(block, bc, srcNodes, containingNodes,
++ liveReplicaStorages, additionalReplRequired, priority);
++ BlockManager.LOG.debug("Creating a ReplicationWork to recover " + block);
+ }
+
++ @Override
+ void chooseTargets(BlockPlacementPolicy blockplacement,
+ BlockStoragePolicySuite storagePolicySuite,
+ Set<Node> excludedNodes) {
++ assert getSrcNodes().length > 0
++ : "At least 1 source node should have been selected";
+ try {
- targets = blockplacement.chooseTarget(bc.getName(),
- additionalReplRequired, srcNode, liveReplicaStorages, false,
- excludedNodes, block.getNumBytes(),
- storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
++ DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget(
++ getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
++ getLiveReplicaStorages(), false, excludedNodes,
++ getBlock().getNumBytes(),
++ storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
++ setTargets(chosenTargets);
+ } finally {
- srcNode.decrementPendingReplicationWithoutTargets();
++ getSrcNodes()[0].decrementPendingReplicationWithoutTargets();
+ }
+ }
-
- DatanodeStorageInfo[] getTargets() {
- return targets;
- }
-
- void resetTargets() {
- this.targets = null;
- }
-
- List<DatanodeDescriptor> getContainingNodes() {
- return Collections.unmodifiableList(containingNodes);
- }
-
- public int getPriority() {
- return priority;
- }
-
- public BlockInfo getBlock() {
- return block;
- }
-
- public DatanodeDescriptor getSrcNode() {
- return srcNode;
- }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
index 3c77120,0000000..6cc1dcd
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
@@@ -1,250 -1,0 +1,220 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Test {@link BlockInfoStriped}
+ */
+public class TestBlockInfoStriped {
+ private static final int TOTAL_NUM_BLOCKS = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
+ private static final long BASE_ID = -1600;
+ private static final Block baseBlock = new Block(BASE_ID);
+ private static final ErasureCodingPolicy testECPolicy
+ = ErasureCodingPolicyManager.getSystemDefaultPolicy();
+ private final BlockInfoStriped info = new BlockInfoStriped(baseBlock,
+ testECPolicy);
+
+ private Block[] createReportedBlocks(int num) {
+ Block[] blocks = new Block[num];
+ for (int i = 0; i < num; i++) {
+ blocks[i] = new Block(BASE_ID + i);
+ }
+ return blocks;
+ }
+
+ /**
+ * Test adding storage and reported block
+ */
+ @Test
+ public void testAddStorage() {
+ // first add NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS storages, i.e., a complete
+ // group of blocks/storages
+ DatanodeStorageInfo[] storageInfos = DFSTestUtil.createDatanodeStorageInfos(
+ TOTAL_NUM_BLOCKS);
+ Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
+ int i = 0;
+ for (; i < storageInfos.length; i += 2) {
+ info.addStorage(storageInfos[i], blocks[i]);
+ Assert.assertEquals(i/2 + 1, info.numNodes());
+ }
+ i /= 2;
+ for (int j = 1; j < storageInfos.length; j += 2) {
+ Assert.assertTrue(info.addStorage(storageInfos[j], blocks[j]));
+ Assert.assertEquals(i + (j+1)/2, info.numNodes());
+ }
+
+ // check
+ byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices");
+ Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
+ Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length);
+ i = 0;
+ for (DatanodeStorageInfo storage : storageInfos) {
+ int index = info.findStorageInfo(storage);
+ Assert.assertEquals(i++, index);
+ Assert.assertEquals(index, indices[index]);
+ }
+
+ // the same block is reported from the same storage twice
+ i = 0;
+ for (DatanodeStorageInfo storage : storageInfos) {
+ Assert.assertTrue(info.addStorage(storage, blocks[i++]));
+ }
+ Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
+ Assert.assertEquals(TOTAL_NUM_BLOCKS, info.numNodes());
+ Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length);
+ i = 0;
+ for (DatanodeStorageInfo storage : storageInfos) {
+ int index = info.findStorageInfo(storage);
+ Assert.assertEquals(i++, index);
+ Assert.assertEquals(index, indices[index]);
+ }
+
+ // the same block is reported from another storage
+ DatanodeStorageInfo[] storageInfos2 = DFSTestUtil.createDatanodeStorageInfos(
+ TOTAL_NUM_BLOCKS * 2);
+ // only add the second half of info2
+ for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) {
+ info.addStorage(storageInfos2[i], blocks[i % TOTAL_NUM_BLOCKS]);
+ Assert.assertEquals(i + 1, info.getCapacity());
+ Assert.assertEquals(i + 1, info.numNodes());
+ indices = (byte[]) Whitebox.getInternalState(info, "indices");
+ Assert.assertEquals(i + 1, indices.length);
+ }
+ for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) {
+ int index = info.findStorageInfo(storageInfos2[i]);
+ Assert.assertEquals(i++, index);
+ Assert.assertEquals(index - TOTAL_NUM_BLOCKS, indices[index]);
+ }
+ }
+
+ @Test
+ public void testRemoveStorage() {
+ // first add TOTAL_NUM_BLOCKS into the BlockInfoStriped
+ DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(
+ TOTAL_NUM_BLOCKS);
+ Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
+ for (int i = 0; i < storages.length; i++) {
+ info.addStorage(storages[i], blocks[i]);
+ }
+
+ // remove two storages
+ info.removeStorage(storages[0]);
+ info.removeStorage(storages[2]);
+
+ // check
+ Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
+ Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes());
+ byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices");
+ for (int i = 0; i < storages.length; i++) {
+ int index = info.findStorageInfo(storages[i]);
+ if (i != 0 && i != 2) {
+ Assert.assertEquals(i, index);
+ Assert.assertEquals(index, indices[index]);
+ } else {
+ Assert.assertEquals(-1, index);
+ Assert.assertEquals(-1, indices[i]);
+ }
+ }
+
+ // the same block is reported from another storage
+ DatanodeStorageInfo[] storages2 = DFSTestUtil.createDatanodeStorageInfos(
+ TOTAL_NUM_BLOCKS * 2);
+ for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) {
+ info.addStorage(storages2[i], blocks[i % TOTAL_NUM_BLOCKS]);
+ }
+ // now we should have 8 storages
+ Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.numNodes());
+ Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity());
+ indices = (byte[]) Whitebox.getInternalState(info, "indices");
+ Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length);
+ int j = TOTAL_NUM_BLOCKS;
+ for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) {
+ int index = info.findStorageInfo(storages2[i]);
+ if (i == TOTAL_NUM_BLOCKS || i == TOTAL_NUM_BLOCKS + 2) {
+ Assert.assertEquals(i - TOTAL_NUM_BLOCKS, index);
+ } else {
+ Assert.assertEquals(j++, index);
+ }
+ }
+
+ // remove the storages from storages2
+ for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) {
+ info.removeStorage(storages2[i + TOTAL_NUM_BLOCKS]);
+ }
+ // now we should have 3 storages
+ Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes());
+ Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity());
+ indices = (byte[]) Whitebox.getInternalState(info, "indices");
+ Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length);
+ for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) {
+ if (i == 0 || i == 2) {
+ int index = info.findStorageInfo(storages2[i + TOTAL_NUM_BLOCKS]);
+ Assert.assertEquals(-1, index);
+ } else {
+ int index = info.findStorageInfo(storages[i]);
+ Assert.assertEquals(i, index);
+ }
+ }
+ for (int i = TOTAL_NUM_BLOCKS; i < TOTAL_NUM_BLOCKS * 2 - 2; i++) {
+ Assert.assertEquals(-1, indices[i]);
+ Assert.assertNull(info.getDatanode(i));
+ }
+ }
+
+ @Test
- public void testReplaceBlock() {
- DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(
- TOTAL_NUM_BLOCKS);
- Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
- // add block/storage 0, 2, 4 into the BlockInfoStriped
- for (int i = 0; i < storages.length; i += 2) {
- Assert.assertEquals(AddBlockResult.ADDED,
- storages[i].addBlock(info, blocks[i]));
- }
-
- BlockInfoStriped newBlockInfo = new BlockInfoStriped(info,
- info.getErasureCodingPolicy());
- newBlockInfo.setBlockCollectionId(info.getBlockCollectionId());
- info.replaceBlock(newBlockInfo);
-
- // make sure the newBlockInfo is correct
- byte[] indices = (byte[]) Whitebox.getInternalState(newBlockInfo, "indices");
- for (int i = 0; i < storages.length; i += 2) {
- int index = newBlockInfo.findStorageInfo(storages[i]);
- Assert.assertEquals(i, index);
- Assert.assertEquals(index, indices[i]);
-
- // make sure the newBlockInfo is added to the linked list of the storage
- Assert.assertSame(newBlockInfo, storages[i].getBlockListHeadForTesting());
- Assert.assertEquals(1, storages[i].numBlocks());
- Assert.assertNull(newBlockInfo.getNext());
- }
- }
-
- @Test
+ public void testWrite() {
+ long blkID = 1;
+ long numBytes = 1;
+ long generationStamp = 1;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE * 3);
+ byteBuffer.putLong(blkID).putLong(numBytes).putLong(generationStamp);
+
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(byteStream);
+ BlockInfoStriped blk = new BlockInfoStriped(new Block(blkID, numBytes,
+ generationStamp), testECPolicy);
+
+ try {
+ blk.write(out);
+ } catch(Exception ex) {
+ fail("testWrite error:" + ex.getMessage());
+ }
+ assertEquals(byteBuffer.array().length, byteStream.toByteArray().length);
+ assertArrayEquals(byteBuffer.array(), byteStream.toByteArray());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------