You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:35 UTC
[23/50] [abbrv] hadoop git commit: HDFS-8938. Extract
BlockToMarkCorrupt and ReplicationWork as standalone classes from
BlockManager. Contributed by Mingliang Liu.
HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6d12cd8d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6d12cd8d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6d12cd8d
Branch: refs/heads/HDFS-7285
Commit: 6d12cd8d609dec26d44cece9937c35b7d72a3cd1
Parents: cbb2495
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Aug 28 14:10:40 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Aug 28 14:14:32 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../server/blockmanagement/BlockManager.java | 368 +++++++------------
.../blockmanagement/BlockToMarkCorrupt.java | 87 +++++
.../server/blockmanagement/ReplicationWork.java | 87 +++++
4 files changed, 319 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d12cd8d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 67a6a6e..c6acfc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -855,6 +855,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8865. Improve quota initialization performance. (kihwal)
+ HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone
+ classes from BlockManager. (Mingliang Liu via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d12cd8d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 95933d2..8f7bb55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1181,24 +1181,24 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo storageInfo,
DatanodeDescriptor node) throws IOException {
- if (b.corrupted.isDeleted()) {
+ if (b.getCorrupted().isDeleted()) {
blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
" corrupt as it does not belong to any file", b);
- addToInvalidates(b.corrupted, node);
+ addToInvalidates(b.getCorrupted(), node);
return;
}
- short expectedReplicas = b.corrupted.getReplication();
+ short expectedReplicas = b.getCorrupted().getReplication();
// Add replica to the data-node if it is not already there
if (storageInfo != null) {
- storageInfo.addBlock(b.stored);
+ storageInfo.addBlock(b.getStored());
}
// Add this replica to corruptReplicas Map
- corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
- b.reasonCode);
+ corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node,
+ b.getReason(), b.getReasonCode());
- NumberReplicas numberOfReplicas = countNodes(b.stored);
+ NumberReplicas numberOfReplicas = countNodes(b.getStored());
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
expectedReplicas;
boolean minReplicationSatisfied =
@@ -1207,7 +1207,7 @@ public class BlockManager implements BlockStatsMXBean {
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
expectedReplicas;
boolean corruptedDuringWrite = minReplicationSatisfied &&
- (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
+ b.isCorruptedDuringWrite();
// case 1: have enough number of live replicas
// case 2: corrupted replicas + live replicas > Replication factor
// case 3: Block is marked corrupt due to failure while writing. In this
@@ -1220,7 +1220,7 @@ public class BlockManager implements BlockStatsMXBean {
invalidateBlock(b, node);
} else if (namesystem.isPopulatingReplQueues()) {
// add the block to neededReplication
- updateNeededReplications(b.stored, -1, 0);
+ updateNeededReplications(b.getStored(), -1, 0);
}
}
@@ -1239,18 +1239,18 @@ public class BlockManager implements BlockStatsMXBean {
}
// Check how many copies we have of the block
- NumberReplicas nr = countNodes(b.stored);
+ NumberReplicas nr = countNodes(b.getStored());
if (nr.replicasOnStaleNodes() > 0) {
blockLog.debug("BLOCK* invalidateBlocks: postponing " +
"invalidation of {} on {} because {} replica(s) are located on " +
"nodes with potentially out-of-date block reports", b, dn,
nr.replicasOnStaleNodes());
- postponeBlock(b.corrupted);
+ postponeBlock(b.getCorrupted());
return false;
} else if (nr.liveReplicas() >= 1) {
// If we have at least one copy on a live node, then we can delete it.
- addToInvalidates(b.corrupted, dn);
- removeStoredBlock(b.stored, node);
+ addToInvalidates(b.getCorrupted(), dn);
+ removeStoredBlock(b.getStored(), node);
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
b, dn);
return true;
@@ -1338,71 +1338,18 @@ public class BlockManager implements BlockStatsMXBean {
*/
@VisibleForTesting
int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
- int requiredReplication, numEffectiveReplicas;
- List<DatanodeDescriptor> containingNodes;
- DatanodeDescriptor srcNode;
- BlockCollection bc = null;
- int additionalReplRequired;
-
int scheduledWork = 0;
- List<ReplicationWork> work = new LinkedList<ReplicationWork>();
+ final List<ReplicationWork> work = new LinkedList<>();
namesystem.writeLock();
try {
synchronized (neededReplications) {
for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
for (BlockInfo block : blocksToReplicate.get(priority)) {
- // block should belong to a file
- bc = getBlockCollection(block);
- // abandoned block or block reopened for append
- if (bc == null
- || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
- // remove from neededReplications
- neededReplications.remove(block, priority);
- continue;
- }
-
- requiredReplication = getExpectedReplicaNum(block);
-
- // get a source data-node
- containingNodes = new ArrayList<DatanodeDescriptor>();
- List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
- NumberReplicas numReplicas = new NumberReplicas();
- srcNode = chooseSourceDatanode(
- block, containingNodes, liveReplicaNodes, numReplicas,
- priority);
- if(srcNode == null) { // block can not be replicated from any node
- LOG.debug("Block " + block + " cannot be repl from any node");
- continue;
- }
-
- // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
- // not included in the numReplicas.liveReplicas() count
- assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
-
- // do not schedule more if enough replicas is already pending
- numEffectiveReplicas = numReplicas.liveReplicas() +
- pendingReplications.getNumReplicas(block);
-
- if (numEffectiveReplicas >= requiredReplication) {
- if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block)) ) {
- neededReplications.remove(block, priority); // remove from neededReplications
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
- " it has enough replicas", block);
- continue;
- }
- }
-
- if (numReplicas.liveReplicas() < requiredReplication) {
- additionalReplRequired = requiredReplication
- - numEffectiveReplicas;
- } else {
- additionalReplRequired = 1; // Needed on a new rack
+ ReplicationWork rw = scheduleReplication(block, priority);
+ if (rw != null) {
+ work.add(rw);
}
- work.add(new ReplicationWork(block, bc, srcNode,
- containingNodes, liveReplicaNodes, additionalReplRequired,
- priority));
}
}
}
@@ -1410,12 +1357,12 @@ public class BlockManager implements BlockStatsMXBean {
namesystem.writeUnlock();
}
- final Set<Node> excludedNodes = new HashSet<Node>();
+ final Set<Node> excludedNodes = new HashSet<>();
for(ReplicationWork rw : work){
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
excludedNodes.clear();
- for (DatanodeDescriptor dn : rw.containingNodes) {
+ for (DatanodeDescriptor dn : rw.getContainingNodes()) {
excludedNodes.add(dn);
}
@@ -1428,67 +1375,15 @@ public class BlockManager implements BlockStatsMXBean {
namesystem.writeLock();
try {
for(ReplicationWork rw : work){
- final DatanodeStorageInfo[] targets = rw.targets;
+ final DatanodeStorageInfo[] targets = rw.getTargets();
if(targets == null || targets.length == 0){
- rw.targets = null;
+ rw.resetTargets();
continue;
}
synchronized (neededReplications) {
- BlockInfo block = rw.block;
- int priority = rw.priority;
- // Recheck since global lock was released
- // block should belong to a file
- bc = getBlockCollection(block);
- // abandoned block or block reopened for append
- if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
- neededReplications.remove(block, priority); // remove from neededReplications
- rw.targets = null;
- continue;
- }
- requiredReplication = getExpectedReplicaNum(block);
-
- // do not schedule more if enough replicas is already pending
- NumberReplicas numReplicas = countNodes(block);
- numEffectiveReplicas = numReplicas.liveReplicas() +
- pendingReplications.getNumReplicas(block);
-
- if (numEffectiveReplicas >= requiredReplication) {
- if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block)) ) {
- neededReplications.remove(block, priority); // remove from neededReplications
- rw.targets = null;
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
- " it has enough replicas", block);
- continue;
- }
- }
-
- if ( (numReplicas.liveReplicas() >= requiredReplication) &&
- (!blockHasEnoughRacks(block)) ) {
- if (rw.srcNode.getNetworkLocation().equals(
- targets[0].getDatanodeDescriptor().getNetworkLocation())) {
- //No use continuing, unless a new rack in this case
- continue;
- }
- }
-
- // Add block to the to be replicated list
- rw.srcNode.addBlockToBeReplicated(block, targets);
- scheduledWork++;
- DatanodeStorageInfo.incrementBlocksScheduled(targets);
-
- // Move the block-replication into a "pending" state.
- // The reason we use 'pending' is so we can retry
- // replications that fail after an appropriate amount of time.
- pendingReplications.increment(block,
- DatanodeStorageInfo.toDatanodeDescriptors(targets));
- blockLog.debug("BLOCK* block {} is moved from neededReplications to "
- + "pendingReplications", block);
-
- // remove from neededReplications
- if(numEffectiveReplicas + targets.length >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
+ if (validateReplicationWork(rw)) {
+ scheduledWork++;
}
}
}
@@ -1499,15 +1394,15 @@ public class BlockManager implements BlockStatsMXBean {
if (blockLog.isInfoEnabled()) {
// log which blocks have been scheduled for replication
for(ReplicationWork rw : work){
- DatanodeStorageInfo[] targets = rw.targets;
+ DatanodeStorageInfo[] targets = rw.getTargets();
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
- for (int k = 0; k < targets.length; k++) {
+ for (DatanodeStorageInfo target : targets) {
targetList.append(' ');
- targetList.append(targets[k].getDatanodeDescriptor());
+ targetList.append(target.getDatanodeDescriptor());
}
- blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
- rw.block, targetList);
+ blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(),
+ rw.getBlock(), targetList);
}
}
}
@@ -1519,6 +1414,118 @@ public class BlockManager implements BlockStatsMXBean {
return scheduledWork;
}
+ boolean hasEnoughEffectiveReplicas(BlockInfo block,
+ NumberReplicas numReplicas, int pendingReplicaNum, int required) {
+ int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
+ return (numEffectiveReplicas >= required) &&
+ (pendingReplicaNum > 0 || blockHasEnoughRacks(block));
+ }
+
+ private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
+ // block should belong to a file
+ BlockCollection bc = getBlockCollection(block);
+ // abandoned block or block reopened for append
+ if (bc == null
+ || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+ // remove from neededReplications
+ neededReplications.remove(block, priority);
+ return null;
+ }
+
+ short requiredReplication = getExpectedReplicaNum(block);
+
+ // get a source data-node
+ List<DatanodeDescriptor> containingNodes = new ArrayList<>();
+ List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
+ NumberReplicas numReplicas = new NumberReplicas();
+ DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes,
+ liveReplicaNodes, numReplicas, priority);
+ if (srcNode == null) { // block can not be replicated from any node
+ LOG.debug("Block " + block + " cannot be repl from any node");
+ return null;
+ }
+
+ // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
+ // not included in the numReplicas.liveReplicas() count
+ assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
+
+ int pendingNum = pendingReplications.getNumReplicas(block);
+ if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
+ requiredReplication)) {
+ neededReplications.remove(block, priority);
+ blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+ " it has enough replicas", block);
+ return null;
+ }
+
+ final int additionalReplRequired;
+ if (numReplicas.liveReplicas() < requiredReplication) {
+ additionalReplRequired = requiredReplication - numReplicas.liveReplicas()
+ - pendingNum;
+ } else {
+ additionalReplRequired = 1; // Needed on a new rack
+ }
+ return new ReplicationWork(block, bc, srcNode, containingNodes,
+ liveReplicaNodes, additionalReplRequired, priority);
+ }
+
+ private boolean validateReplicationWork(ReplicationWork rw) {
+ BlockInfo block = rw.getBlock();
+ int priority = rw.getPriority();
+ // Recheck since global lock was released
+ // block should belong to a file
+ BlockCollection bc = getBlockCollection(block);
+ // abandoned block or block reopened for append
+ if (bc == null
+ || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+ neededReplications.remove(block, priority);
+ rw.resetTargets();
+ return false;
+ }
+
+ // do not schedule more if enough replicas is already pending
+ final short requiredReplication = getExpectedReplicaNum(block);
+ NumberReplicas numReplicas = countNodes(block);
+ final int pendingNum = pendingReplications.getNumReplicas(block);
+ if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
+ requiredReplication)) {
+ neededReplications.remove(block, priority);
+ rw.resetTargets();
+ blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+ " it has enough replicas", block);
+ return false;
+ }
+
+ DatanodeStorageInfo[] targets = rw.getTargets();
+ if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+ (!blockHasEnoughRacks(block)) ) {
+ if (rw.getSrcNode().getNetworkLocation().equals(
+ targets[0].getDatanodeDescriptor().getNetworkLocation())) {
+ //No use continuing, unless a new rack in this case
+ return false;
+ }
+ }
+
+ // Add block to the to be replicated list
+ rw.getSrcNode().addBlockToBeReplicated(block, targets);
+ DatanodeStorageInfo.incrementBlocksScheduled(targets);
+
+ // Move the block-replication into a "pending" state.
+ // The reason we use 'pending' is so we can retry
+ // replications that fail after an appropriate amount of time.
+ pendingReplications.increment(block,
+ DatanodeStorageInfo.toDatanodeDescriptors(targets));
+ blockLog.debug("BLOCK* block {} is moved from neededReplications to "
+ + "pendingReplications", block);
+
+ int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
+ // remove from neededReplications
+ if(numEffectiveReplicas + targets.length >= requiredReplication) {
+ neededReplications.remove(block, priority);
+ }
+ return true;
+ }
+
/** Choose target for WebHDFS redirection. */
public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
@@ -1765,52 +1772,6 @@ public class BlockManager implements BlockStatsMXBean {
this.reportedState = reportedState;
}
}
-
- /**
- * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
- * list of blocks that should be considered corrupt due to a block report.
- */
- private static class BlockToMarkCorrupt {
- /** The corrupted block in a datanode. */
- final BlockInfo corrupted;
- /** The corresponding block stored in the BlockManager. */
- final BlockInfo stored;
- /** The reason to mark corrupt. */
- final String reason;
- /** The reason code to be stored */
- final Reason reasonCode;
-
- BlockToMarkCorrupt(BlockInfo corrupted,
- BlockInfo stored, String reason,
- Reason reasonCode) {
- Preconditions.checkNotNull(corrupted, "corrupted is null");
- Preconditions.checkNotNull(stored, "stored is null");
-
- this.corrupted = corrupted;
- this.stored = stored;
- this.reason = reason;
- this.reasonCode = reasonCode;
- }
-
- BlockToMarkCorrupt(BlockInfo stored, String reason,
- Reason reasonCode) {
- this(stored, stored, reason, reasonCode);
- }
-
- BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
- Reason reasonCode) {
- this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
- reason, reasonCode);
- //the corrupted block in datanode has a different generation stamp
- corrupted.setGenerationStamp(gs);
- }
-
- @Override
- public String toString() {
- return corrupted + "("
- + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
- }
- }
/**
* The given storage is reporting all its blocks.
@@ -3797,51 +3758,6 @@ public class BlockManager implements BlockStatsMXBean {
null);
}
- private static class ReplicationWork {
-
- private final BlockInfo block;
- private final BlockCollection bc;
-
- private final DatanodeDescriptor srcNode;
- private final List<DatanodeDescriptor> containingNodes;
- private final List<DatanodeStorageInfo> liveReplicaStorages;
- private final int additionalReplRequired;
-
- private DatanodeStorageInfo targets[];
- private final int priority;
-
- public ReplicationWork(BlockInfo block,
- BlockCollection bc,
- DatanodeDescriptor srcNode,
- List<DatanodeDescriptor> containingNodes,
- List<DatanodeStorageInfo> liveReplicaStorages,
- int additionalReplRequired,
- int priority) {
- this.block = block;
- this.bc = bc;
- this.srcNode = srcNode;
- this.srcNode.incrementPendingReplicationWithoutTargets();
- this.containingNodes = containingNodes;
- this.liveReplicaStorages = liveReplicaStorages;
- this.additionalReplRequired = additionalReplRequired;
- this.priority = priority;
- this.targets = null;
- }
-
- private void chooseTargets(BlockPlacementPolicy blockplacement,
- BlockStoragePolicySuite storagePolicySuite,
- Set<Node> excludedNodes) {
- try {
- targets = blockplacement.chooseTarget(bc.getName(),
- additionalReplRequired, srcNode, liveReplicaStorages, false,
- excludedNodes, block.getNumBytes(),
- storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
- } finally {
- srcNode.decrementPendingReplicationWithoutTargets();
- }
- }
- }
-
/**
* A simple result enum for the result of
* {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
@@ -3855,9 +3771,9 @@ public class BlockManager implements BlockStatsMXBean {
OVER_REPLICATED,
/** A decision can't currently be made about this block. */
POSTPONE,
- /** The block is under construction, so should be ignored */
+ /** The block is under construction, so should be ignored. */
UNDER_CONSTRUCTION,
- /** The block is properly replicated */
+ /** The block is properly replicated. */
OK
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d12cd8d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
new file mode 100644
index 0000000..3842e56
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+ * list of blocks that should be considered corrupt due to a block report.
+ */
+class BlockToMarkCorrupt {
+ /** The corrupted block in a datanode. */
+ private final BlockInfo corrupted;
+ /** The corresponding block stored in the BlockManager. */
+ private final BlockInfo stored;
+ /** The reason to mark corrupt. */
+ private final String reason;
+ /** The reason code to be stored */
+ private final CorruptReplicasMap.Reason reasonCode;
+
+ BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
+ CorruptReplicasMap.Reason reasonCode) {
+ Preconditions.checkNotNull(corrupted, "corrupted is null");
+ Preconditions.checkNotNull(stored, "stored is null");
+
+ this.corrupted = corrupted;
+ this.stored = stored;
+ this.reason = reason;
+ this.reasonCode = reasonCode;
+ }
+
+ BlockToMarkCorrupt(BlockInfo stored, String reason,
+ CorruptReplicasMap.Reason reasonCode) {
+ this(stored, stored, reason, reasonCode);
+ }
+
+ BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
+ CorruptReplicasMap.Reason reasonCode) {
+ this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
+ reason, reasonCode);
+ //the corrupted block in datanode has a different generation stamp
+ corrupted.setGenerationStamp(gs);
+ }
+
+ public boolean isCorruptedDuringWrite() {
+ return stored.getGenerationStamp() > corrupted.getGenerationStamp();
+ }
+
+ public BlockInfo getCorrupted() {
+ return corrupted;
+ }
+
+ public BlockInfo getStored() {
+ return stored;
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ public Reason getReasonCode() {
+ return reasonCode;
+ }
+
+ @Override
+ public String toString() {
+ return corrupted + "("
+ + (corrupted == stored ? "same as stored": "stored=" + stored) + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d12cd8d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
new file mode 100644
index 0000000..f8a6dad
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.net.Node;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+class ReplicationWork {
+ private final BlockInfo block;
+ private final BlockCollection bc;
+ private final DatanodeDescriptor srcNode;
+ private final int additionalReplRequired;
+ private final int priority;
+ private final List<DatanodeDescriptor> containingNodes;
+ private final List<DatanodeStorageInfo> liveReplicaStorages;
+ private DatanodeStorageInfo[] targets;
+
+ public ReplicationWork(BlockInfo block, BlockCollection bc,
+ DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes,
+ List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired,
+ int priority) {
+ this.block = block;
+ this.bc = bc;
+ this.srcNode = srcNode;
+ this.srcNode.incrementPendingReplicationWithoutTargets();
+ this.containingNodes = containingNodes;
+ this.liveReplicaStorages = liveReplicaStorages;
+ this.additionalReplRequired = additionalReplRequired;
+ this.priority = priority;
+ this.targets = null;
+ }
+
+ void chooseTargets(BlockPlacementPolicy blockplacement,
+ BlockStoragePolicySuite storagePolicySuite,
+ Set<Node> excludedNodes) {
+ try {
+ targets = blockplacement.chooseTarget(bc.getName(),
+ additionalReplRequired, srcNode, liveReplicaStorages, false,
+ excludedNodes, block.getNumBytes(),
+ storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
+ } finally {
+ srcNode.decrementPendingReplicationWithoutTargets();
+ }
+ }
+
+ DatanodeStorageInfo[] getTargets() {
+ return targets;
+ }
+
+ void resetTargets() {
+ this.targets = null;
+ }
+
+ List<DatanodeDescriptor> getContainingNodes() {
+ return Collections.unmodifiableList(containingNodes);
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public BlockInfo getBlock() {
+ return block;
+ }
+
+ public DatanodeDescriptor getSrcNode() {
+ return srcNode;
+ }
+}