You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:30 UTC
[18/50] [abbrv] hadoop git commit: Revert "HDFS-8938. Extract
BlockToMarkCorrupt and ReplicationWork as standalone classes from
BlockManager. Contributed by Mingliang Liu."
Revert "HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone classes from BlockManager. Contributed by Mingliang Liu."
This reverts commit 4e9307f26dd41270f95fb50166e1a091852e4d58.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/035ed261
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/035ed261
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/035ed261
Branch: refs/heads/HDFS-7285
Commit: 035ed26147f10620fc6ed3a514d9ebbcc31304b5
Parents: 4e9307f
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Aug 27 16:09:35 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Thu Aug 27 16:09:35 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 -
.../server/blockmanagement/BlockManager.java | 368 ++++++++++++-------
.../blockmanagement/BlockToMarkCorrupt.java | 87 -----
.../server/blockmanagement/ReplicationWork.java | 87 -----
4 files changed, 226 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/035ed261/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c4c8c3b..9cc3326 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -853,9 +853,6 @@ Release 2.8.0 - UNRELEASED
HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf.
(Mingliang Liu via wheat9)
- HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone
- classes from BlockManager. (Mingliang Liu via wheat9)
-
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/035ed261/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 5a05fa7..95933d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1181,24 +1181,24 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo storageInfo,
DatanodeDescriptor node) throws IOException {
- if (b.getCorrupted().isDeleted()) {
+ if (b.corrupted.isDeleted()) {
blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
" corrupt as it does not belong to any file", b);
- addToInvalidates(b.getCorrupted(), node);
+ addToInvalidates(b.corrupted, node);
return;
}
- short expectedReplicas = b.getCorrupted().getReplication();
+ short expectedReplicas = b.corrupted.getReplication();
// Add replica to the data-node if it is not already there
if (storageInfo != null) {
- storageInfo.addBlock(b.getStored());
+ storageInfo.addBlock(b.stored);
}
// Add this replica to corruptReplicas Map
- corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node,
- b.getReason(), b.getReasonCode());
+ corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
+ b.reasonCode);
- NumberReplicas numberOfReplicas = countNodes(b.getStored());
+ NumberReplicas numberOfReplicas = countNodes(b.stored);
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
expectedReplicas;
boolean minReplicationSatisfied =
@@ -1207,7 +1207,7 @@ public class BlockManager implements BlockStatsMXBean {
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
expectedReplicas;
boolean corruptedDuringWrite = minReplicationSatisfied &&
- b.isCorruptedDuringWrite();
+ (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
// case 1: have enough number of live replicas
// case 2: corrupted replicas + live replicas > Replication factor
// case 3: Block is marked corrupt due to failure while writing. In this
@@ -1220,7 +1220,7 @@ public class BlockManager implements BlockStatsMXBean {
invalidateBlock(b, node);
} else if (namesystem.isPopulatingReplQueues()) {
// add the block to neededReplication
- updateNeededReplications(b.getStored(), -1, 0);
+ updateNeededReplications(b.stored, -1, 0);
}
}
@@ -1239,18 +1239,18 @@ public class BlockManager implements BlockStatsMXBean {
}
// Check how many copies we have of the block
- NumberReplicas nr = countNodes(b.getStored());
+ NumberReplicas nr = countNodes(b.stored);
if (nr.replicasOnStaleNodes() > 0) {
blockLog.debug("BLOCK* invalidateBlocks: postponing " +
"invalidation of {} on {} because {} replica(s) are located on " +
"nodes with potentially out-of-date block reports", b, dn,
nr.replicasOnStaleNodes());
- postponeBlock(b.getCorrupted());
+ postponeBlock(b.corrupted);
return false;
} else if (nr.liveReplicas() >= 1) {
// If we have at least one copy on a live node, then we can delete it.
- addToInvalidates(b.getCorrupted(), dn);
- removeStoredBlock(b.getStored(), node);
+ addToInvalidates(b.corrupted, dn);
+ removeStoredBlock(b.stored, node);
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
b, dn);
return true;
@@ -1338,18 +1338,71 @@ public class BlockManager implements BlockStatsMXBean {
*/
@VisibleForTesting
int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
+ int requiredReplication, numEffectiveReplicas;
+ List<DatanodeDescriptor> containingNodes;
+ DatanodeDescriptor srcNode;
+ BlockCollection bc = null;
+ int additionalReplRequired;
+
int scheduledWork = 0;
- final List<ReplicationWork> work = new LinkedList<>();
+ List<ReplicationWork> work = new LinkedList<ReplicationWork>();
namesystem.writeLock();
try {
synchronized (neededReplications) {
for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
for (BlockInfo block : blocksToReplicate.get(priority)) {
- ReplicationWork rw = scheduleReplication(block, priority);
- if (rw != null) {
- work.add(rw);
+ // block should belong to a file
+ bc = getBlockCollection(block);
+ // abandoned block or block reopened for append
+ if (bc == null
+ || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+ // remove from neededReplications
+ neededReplications.remove(block, priority);
+ continue;
+ }
+
+ requiredReplication = getExpectedReplicaNum(block);
+
+ // get a source data-node
+ containingNodes = new ArrayList<DatanodeDescriptor>();
+ List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
+ NumberReplicas numReplicas = new NumberReplicas();
+ srcNode = chooseSourceDatanode(
+ block, containingNodes, liveReplicaNodes, numReplicas,
+ priority);
+ if(srcNode == null) { // block can not be replicated from any node
+ LOG.debug("Block " + block + " cannot be repl from any node");
+ continue;
+ }
+
+ // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
+ // not included in the numReplicas.liveReplicas() count
+ assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
+
+ // do not schedule more if enough replicas is already pending
+ numEffectiveReplicas = numReplicas.liveReplicas() +
+ pendingReplications.getNumReplicas(block);
+
+ if (numEffectiveReplicas >= requiredReplication) {
+ if ( (pendingReplications.getNumReplicas(block) > 0) ||
+ (blockHasEnoughRacks(block)) ) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+ " it has enough replicas", block);
+ continue;
+ }
+ }
+
+ if (numReplicas.liveReplicas() < requiredReplication) {
+ additionalReplRequired = requiredReplication
+ - numEffectiveReplicas;
+ } else {
+ additionalReplRequired = 1; // Needed on a new rack
}
+ work.add(new ReplicationWork(block, bc, srcNode,
+ containingNodes, liveReplicaNodes, additionalReplRequired,
+ priority));
}
}
}
@@ -1357,12 +1410,12 @@ public class BlockManager implements BlockStatsMXBean {
namesystem.writeUnlock();
}
- final Set<Node> excludedNodes = new HashSet<>();
+ final Set<Node> excludedNodes = new HashSet<Node>();
for(ReplicationWork rw : work){
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
excludedNodes.clear();
- for (DatanodeDescriptor dn : rw.getContainingNodes()) {
+ for (DatanodeDescriptor dn : rw.containingNodes) {
excludedNodes.add(dn);
}
@@ -1375,15 +1428,67 @@ public class BlockManager implements BlockStatsMXBean {
namesystem.writeLock();
try {
for(ReplicationWork rw : work){
- final DatanodeStorageInfo[] targets = rw.getTargets();
+ final DatanodeStorageInfo[] targets = rw.targets;
if(targets == null || targets.length == 0){
- rw.resetTargets();
+ rw.targets = null;
continue;
}
synchronized (neededReplications) {
- if (validateReplicationWork(rw)) {
- scheduledWork++;
+ BlockInfo block = rw.block;
+ int priority = rw.priority;
+ // Recheck since global lock was released
+ // block should belong to a file
+ bc = getBlockCollection(block);
+ // abandoned block or block reopened for append
+ if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ rw.targets = null;
+ continue;
+ }
+ requiredReplication = getExpectedReplicaNum(block);
+
+ // do not schedule more if enough replicas is already pending
+ NumberReplicas numReplicas = countNodes(block);
+ numEffectiveReplicas = numReplicas.liveReplicas() +
+ pendingReplications.getNumReplicas(block);
+
+ if (numEffectiveReplicas >= requiredReplication) {
+ if ( (pendingReplications.getNumReplicas(block) > 0) ||
+ (blockHasEnoughRacks(block)) ) {
+ neededReplications.remove(block, priority); // remove from neededReplications
+ rw.targets = null;
+ blockLog.debug("BLOCK* Removing {} from neededReplications as" +
+ " it has enough replicas", block);
+ continue;
+ }
+ }
+
+ if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+ (!blockHasEnoughRacks(block)) ) {
+ if (rw.srcNode.getNetworkLocation().equals(
+ targets[0].getDatanodeDescriptor().getNetworkLocation())) {
+ //No use continuing, unless a new rack in this case
+ continue;
+ }
+ }
+
+ // Add block to the to be replicated list
+ rw.srcNode.addBlockToBeReplicated(block, targets);
+ scheduledWork++;
+ DatanodeStorageInfo.incrementBlocksScheduled(targets);
+
+ // Move the block-replication into a "pending" state.
+ // The reason we use 'pending' is so we can retry
+ // replications that fail after an appropriate amount of time.
+ pendingReplications.increment(block,
+ DatanodeStorageInfo.toDatanodeDescriptors(targets));
+ blockLog.debug("BLOCK* block {} is moved from neededReplications to "
+ + "pendingReplications", block);
+
+ // remove from neededReplications
+ if(numEffectiveReplicas + targets.length >= requiredReplication) {
+ neededReplications.remove(block, priority); // remove from neededReplications
}
}
}
@@ -1394,15 +1499,15 @@ public class BlockManager implements BlockStatsMXBean {
if (blockLog.isInfoEnabled()) {
// log which blocks have been scheduled for replication
for(ReplicationWork rw : work){
- DatanodeStorageInfo[] targets = rw.getTargets();
+ DatanodeStorageInfo[] targets = rw.targets;
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
- for (DatanodeStorageInfo target : targets) {
+ for (int k = 0; k < targets.length; k++) {
targetList.append(' ');
- targetList.append(target.getDatanodeDescriptor());
+ targetList.append(targets[k].getDatanodeDescriptor());
}
- blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(),
- rw.getBlock(), targetList);
+ blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
+ rw.block, targetList);
}
}
}
@@ -1414,118 +1519,6 @@ public class BlockManager implements BlockStatsMXBean {
return scheduledWork;
}
- boolean hasEnoughEffectiveReplicas(BlockInfo block,
- NumberReplicas numReplicas, int pendingReplicaNum, int required) {
- int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
- return (numEffectiveReplicas >= required) &&
- (pendingReplicaNum > 0 || blockHasEnoughRacks(block));
- }
-
- private ReplicationWork scheduleReplication(BlockInfo block, int priority) {
- // block should belong to a file
- BlockCollection bc = getBlockCollection(block);
- // abandoned block or block reopened for append
- if (bc == null
- || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
- // remove from neededReplications
- neededReplications.remove(block, priority);
- return null;
- }
-
- short requiredReplication = getExpectedReplicaNum(block);
-
- // get a source data-node
- List<DatanodeDescriptor> containingNodes = new ArrayList<>();
- List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
- NumberReplicas numReplicas = new NumberReplicas();
- DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes,
- liveReplicaNodes, numReplicas, priority);
- if (srcNode == null) { // block can not be replicated from any node
- LOG.debug("Block " + block + " cannot be repl from any node");
- return null;
- }
-
- // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
- // not included in the numReplicas.liveReplicas() count
- assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
-
- int pendingNum = pendingReplications.getNumReplicas(block);
- if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
- requiredReplication)) {
- neededReplications.remove(block, priority);
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
- " it has enough replicas", block);
- return null;
- }
-
- final int additionalReplRequired;
- if (numReplicas.liveReplicas() < requiredReplication) {
- additionalReplRequired = requiredReplication - numReplicas.liveReplicas()
- - pendingNum;
- } else {
- additionalReplRequired = 1; // Needed on a new rack
- }
- return new ReplicationWork(block, bc, srcNode, containingNodes,
- liveReplicaNodes, additionalReplRequired, priority);
- }
-
- private boolean validateReplicationWork(ReplicationWork rw) {
- BlockInfo block = rw.getBlock();
- int priority = rw.getPriority();
- // Recheck since global lock was released
- // block should belong to a file
- BlockCollection bc = getBlockCollection(block);
- // abandoned block or block reopened for append
- if(bc == null
- || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
- neededReplications.remove(block, priority);
- rw.resetTargets();
- return false;
- }
-
- // do not schedule more if enough replicas is already pending
- final short requiredReplication = getExpectedReplicaNum(block);
- NumberReplicas numReplicas = countNodes(block);
- final int pendingNum = pendingReplications.getNumReplicas(block);
- if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
- requiredReplication)) {
- neededReplications.remove(block, priority);
- rw.resetTargets();
- blockLog.debug("BLOCK* Removing {} from neededReplications as" +
- " it has enough replicas", block);
- return false;
- }
-
- DatanodeStorageInfo[] targets = rw.getTargets();
- if ( (numReplicas.liveReplicas() >= requiredReplication) &&
- (!blockHasEnoughRacks(block)) ) {
- if (rw.getSrcNode().getNetworkLocation().equals(
- targets[0].getDatanodeDescriptor().getNetworkLocation())) {
- //No use continuing, unless a new rack in this case
- return false;
- }
- }
-
- // Add block to the to be replicated list
- rw.getSrcNode().addBlockToBeReplicated(block, targets);
- DatanodeStorageInfo.incrementBlocksScheduled(targets);
-
- // Move the block-replication into a "pending" state.
- // The reason we use 'pending' is so we can retry
- // replications that fail after an appropriate amount of time.
- pendingReplications.increment(block,
- DatanodeStorageInfo.toDatanodeDescriptors(targets));
- blockLog.debug("BLOCK* block {} is moved from neededReplications to "
- + "pendingReplications", block);
-
- int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
- // remove from neededReplications
- if(numEffectiveReplicas + targets.length >= requiredReplication) {
- neededReplications.remove(block, priority);
- }
- return true;
- }
-
/** Choose target for WebHDFS redirection. */
public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
@@ -1772,6 +1765,52 @@ public class BlockManager implements BlockStatsMXBean {
this.reportedState = reportedState;
}
}
+
+ /**
+ * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+ * list of blocks that should be considered corrupt due to a block report.
+ */
+ private static class BlockToMarkCorrupt {
+ /** The corrupted block in a datanode. */
+ final BlockInfo corrupted;
+ /** The corresponding block stored in the BlockManager. */
+ final BlockInfo stored;
+ /** The reason to mark corrupt. */
+ final String reason;
+ /** The reason code to be stored */
+ final Reason reasonCode;
+
+ BlockToMarkCorrupt(BlockInfo corrupted,
+ BlockInfo stored, String reason,
+ Reason reasonCode) {
+ Preconditions.checkNotNull(corrupted, "corrupted is null");
+ Preconditions.checkNotNull(stored, "stored is null");
+
+ this.corrupted = corrupted;
+ this.stored = stored;
+ this.reason = reason;
+ this.reasonCode = reasonCode;
+ }
+
+ BlockToMarkCorrupt(BlockInfo stored, String reason,
+ Reason reasonCode) {
+ this(stored, stored, reason, reasonCode);
+ }
+
+ BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
+ Reason reasonCode) {
+ this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
+ reason, reasonCode);
+ //the corrupted block in datanode has a different generation stamp
+ corrupted.setGenerationStamp(gs);
+ }
+
+ @Override
+ public String toString() {
+ return corrupted + "("
+ + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
+ }
+ }
/**
* The given storage is reporting all its blocks.
@@ -3758,6 +3797,51 @@ public class BlockManager implements BlockStatsMXBean {
null);
}
+ private static class ReplicationWork {
+
+ private final BlockInfo block;
+ private final BlockCollection bc;
+
+ private final DatanodeDescriptor srcNode;
+ private final List<DatanodeDescriptor> containingNodes;
+ private final List<DatanodeStorageInfo> liveReplicaStorages;
+ private final int additionalReplRequired;
+
+ private DatanodeStorageInfo targets[];
+ private final int priority;
+
+ public ReplicationWork(BlockInfo block,
+ BlockCollection bc,
+ DatanodeDescriptor srcNode,
+ List<DatanodeDescriptor> containingNodes,
+ List<DatanodeStorageInfo> liveReplicaStorages,
+ int additionalReplRequired,
+ int priority) {
+ this.block = block;
+ this.bc = bc;
+ this.srcNode = srcNode;
+ this.srcNode.incrementPendingReplicationWithoutTargets();
+ this.containingNodes = containingNodes;
+ this.liveReplicaStorages = liveReplicaStorages;
+ this.additionalReplRequired = additionalReplRequired;
+ this.priority = priority;
+ this.targets = null;
+ }
+
+ private void chooseTargets(BlockPlacementPolicy blockplacement,
+ BlockStoragePolicySuite storagePolicySuite,
+ Set<Node> excludedNodes) {
+ try {
+ targets = blockplacement.chooseTarget(bc.getName(),
+ additionalReplRequired, srcNode, liveReplicaStorages, false,
+ excludedNodes, block.getNumBytes(),
+ storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
+ } finally {
+ srcNode.decrementPendingReplicationWithoutTargets();
+ }
+ }
+ }
+
/**
* A simple result enum for the result of
* {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
@@ -3771,9 +3855,9 @@ public class BlockManager implements BlockStatsMXBean {
OVER_REPLICATED,
/** A decision can't currently be made about this block. */
POSTPONE,
- /** The block is under construction, so should be ignored. */
+ /** The block is under construction, so should be ignored */
UNDER_CONSTRUCTION,
- /** The block is properly replicated. */
+ /** The block is properly replicated */
OK
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/035ed261/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
deleted file mode 100644
index 3842e56..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
-
-import com.google.common.base.Preconditions;
-
-/**
- * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
- * list of blocks that should be considered corrupt due to a block report.
- */
-class BlockToMarkCorrupt {
- /** The corrupted block in a datanode. */
- private final BlockInfo corrupted;
- /** The corresponding block stored in the BlockManager. */
- private final BlockInfo stored;
- /** The reason to mark corrupt. */
- private final String reason;
- /** The reason code to be stored */
- private final CorruptReplicasMap.Reason reasonCode;
-
- BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
- CorruptReplicasMap.Reason reasonCode) {
- Preconditions.checkNotNull(corrupted, "corrupted is null");
- Preconditions.checkNotNull(stored, "stored is null");
-
- this.corrupted = corrupted;
- this.stored = stored;
- this.reason = reason;
- this.reasonCode = reasonCode;
- }
-
- BlockToMarkCorrupt(BlockInfo stored, String reason,
- CorruptReplicasMap.Reason reasonCode) {
- this(stored, stored, reason, reasonCode);
- }
-
- BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
- CorruptReplicasMap.Reason reasonCode) {
- this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored,
- reason, reasonCode);
- //the corrupted block in datanode has a different generation stamp
- corrupted.setGenerationStamp(gs);
- }
-
- public boolean isCorruptedDuringWrite() {
- return stored.getGenerationStamp() > corrupted.getGenerationStamp();
- }
-
- public BlockInfo getCorrupted() {
- return corrupted;
- }
-
- public BlockInfo getStored() {
- return stored;
- }
-
- public String getReason() {
- return reason;
- }
-
- public Reason getReasonCode() {
- return reasonCode;
- }
-
- @Override
- public String toString() {
- return corrupted + "("
- + (corrupted == stored ? "same as stored": "stored=" + stored) + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/035ed261/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
deleted file mode 100644
index f8a6dad..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import org.apache.hadoop.net.Node;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-class ReplicationWork {
- private final BlockInfo block;
- private final BlockCollection bc;
- private final DatanodeDescriptor srcNode;
- private final int additionalReplRequired;
- private final int priority;
- private final List<DatanodeDescriptor> containingNodes;
- private final List<DatanodeStorageInfo> liveReplicaStorages;
- private DatanodeStorageInfo[] targets;
-
- public ReplicationWork(BlockInfo block, BlockCollection bc,
- DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes,
- List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired,
- int priority) {
- this.block = block;
- this.bc = bc;
- this.srcNode = srcNode;
- this.srcNode.incrementPendingReplicationWithoutTargets();
- this.containingNodes = containingNodes;
- this.liveReplicaStorages = liveReplicaStorages;
- this.additionalReplRequired = additionalReplRequired;
- this.priority = priority;
- this.targets = null;
- }
-
- void chooseTargets(BlockPlacementPolicy blockplacement,
- BlockStoragePolicySuite storagePolicySuite,
- Set<Node> excludedNodes) {
- try {
- targets = blockplacement.chooseTarget(bc.getName(),
- additionalReplRequired, srcNode, liveReplicaStorages, false,
- excludedNodes, block.getNumBytes(),
- storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
- } finally {
- srcNode.decrementPendingReplicationWithoutTargets();
- }
- }
-
- DatanodeStorageInfo[] getTargets() {
- return targets;
- }
-
- void resetTargets() {
- this.targets = null;
- }
-
- List<DatanodeDescriptor> getContainingNodes() {
- return Collections.unmodifiableList(containingNodes);
- }
-
- public int getPriority() {
- return priority;
- }
-
- public BlockInfo getBlock() {
- return block;
- }
-
- public DatanodeDescriptor getSrcNode() {
- return srcNode;
- }
-}