You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/03/18 23:53:13 UTC
hadoop git commit: HDFS-7369. Erasure coding: distribute recovery
work for striped blocks to DataNode. Contributed by Zhe Zhang.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7285 7d6043869 -> 5c6a774ff
HDFS-7369. Erasure coding: distribute recovery work for striped blocks to DataNode. Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c6a774f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c6a774f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c6a774f
Branch: refs/heads/HDFS-7285
Commit: 5c6a774ff712749e961c5a4a74371800d7144e07
Parents: 7d60438
Author: Zhe Zhang <zh...@apache.org>
Authored: Wed Mar 18 15:52:36 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Wed Mar 18 15:52:36 2015 -0700
----------------------------------------------------------------------
.../server/blockmanagement/BlockCollection.java | 5 +
.../server/blockmanagement/BlockManager.java | 296 +++++++++++++------
.../blockmanagement/DatanodeDescriptor.java | 72 ++++-
.../server/blockmanagement/DatanodeManager.java | 20 +-
.../hadoop/hdfs/server/namenode/INodeFile.java | 9 +-
.../server/protocol/BlockECRecoveryCommand.java | 63 ++++
.../hdfs/server/protocol/DatanodeProtocol.java | 1 +
.../blockmanagement/BlockManagerTestUtil.java | 2 +-
.../blockmanagement/TestBlockManager.java | 22 +-
.../TestRecoverStripedBlocks.java | 107 +++++++
10 files changed, 486 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6a774f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
index 1c753de..62a5781 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
@@ -86,4 +86,9 @@ public interface BlockCollection {
* @return whether the block collection is under construction.
*/
public boolean isUnderConstruction();
+
+ /**
+ * @return whether the block collection is in striping format
+ */
+ public boolean isStriped();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6a774f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index bb28343..aed8daf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
@@ -530,9 +531,9 @@ public class BlockManager {
NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used
- chooseSourceDatanode(block, containingNodes,
+ chooseSourceDatanodes(getStoredBlock(block), containingNodes,
containingLiveReplicasNodes, numReplicas,
- UnderReplicatedBlocks.LEVEL);
+ null, 1, UnderReplicatedBlocks.LEVEL);
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
@@ -1330,15 +1331,15 @@ public class BlockManager {
}
/**
- * Scan blocks in {@link #neededReplications} and assign replication
- * work to data-nodes they belong to.
+ * Scan blocks in {@link #neededReplications} and assign recovery
+ * (replication or erasure coding) work to data-nodes they belong to.
*
* The number of process blocks equals either twice the number of live
* data-nodes or the number of under-replicated blocks whichever is less.
*
* @return number of blocks scheduled for replication during this iteration.
*/
- int computeReplicationWork(int blocksToProcess) {
+ int computeBlockRecoveryWork(int blocksToProcess) {
List<List<BlockInfo>> blocksToReplicate = null;
namesystem.writeLock();
try {
@@ -1348,30 +1349,32 @@ public class BlockManager {
} finally {
namesystem.writeUnlock();
}
- return computeReplicationWorkForBlocks(blocksToReplicate);
+ return computeRecoveryWorkForBlocks(blocksToReplicate);
}
- /** Replicate a set of blocks
+ /**
+ * Recover a set of blocks to full strength through replication or
+ * erasure coding
*
- * @param blocksToReplicate blocks to be replicated, for each priority
+ * @param blocksToRecover blocks to be recovered, for each priority
* @return the number of blocks scheduled for replication
*/
@VisibleForTesting
- int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
+ int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes;
- DatanodeDescriptor srcNode;
BlockCollection bc = null;
int additionalReplRequired;
int scheduledWork = 0;
- List<ReplicationWork> work = new LinkedList<ReplicationWork>();
+ List<BlockRecoveryWork> recovWork = new LinkedList<>();
+ // Step 1: categorize at-risk blocks into replication and EC tasks
namesystem.writeLock();
try {
synchronized (neededReplications) {
- for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
- for (BlockInfo block : blocksToReplicate.get(priority)) {
+ for (int priority = 0; priority < blocksToRecover.size(); priority++) {
+ for (BlockInfo block : blocksToRecover.get(priority)) {
// block should belong to a file
bc = blocksMap.getBlockCollection(block);
// abandoned block or block reopened for append
@@ -1385,25 +1388,31 @@ public class BlockManager {
requiredReplication = bc.getBlockReplication();
// get a source data-node
- containingNodes = new ArrayList<DatanodeDescriptor>();
- List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
+ containingNodes = new ArrayList<>();
+ List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas();
- srcNode = chooseSourceDatanode(
+ List<Short> missingBlockIndices = new LinkedList<>();
+ DatanodeDescriptor[] srcNodes;
+ int numSourceNodes = bc.isStriped() ?
+ HdfsConstants.NUM_DATA_BLOCKS : 1;
+ srcNodes = chooseSourceDatanodes(
block, containingNodes, liveReplicaNodes, numReplicas,
- priority);
- if(srcNode == null) { // block can not be replicated from any node
- LOG.debug("Block " + block + " cannot be repl from any node");
+ missingBlockIndices, numSourceNodes, priority);
+ if(srcNodes == null || srcNodes.length == 0) {
+ // block can not be replicated from any node
+ LOG.debug("Block " + block + " cannot be recovered " +
+ "from any node");
continue;
}
- // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
+ // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
-
+
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
@@ -1420,9 +1429,21 @@ public class BlockManager {
} else {
additionalReplRequired = 1; // Needed on a new rack
}
- work.add(new ReplicationWork(block, bc, srcNode,
- containingNodes, liveReplicaNodes, additionalReplRequired,
- priority));
+ if (bc.isStriped()) {
+ ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes,
+ containingNodes, liveReplicaNodes, additionalReplRequired,
+ priority);
+ short[] missingBlockArray = new short[missingBlockIndices.size()];
+ for (int i = 0 ; i < missingBlockIndices.size(); i++) {
+ missingBlockArray[i] = missingBlockIndices.get(i);
+ }
+ ecw.setMissingBlockIndices(missingBlockArray);
+ recovWork.add(ecw);
+ } else {
+ recovWork.add(new ReplicationWork(block, bc, srcNodes,
+ containingNodes, liveReplicaNodes, additionalReplRequired,
+ priority));
+ }
}
}
}
@@ -1430,8 +1451,9 @@ public class BlockManager {
namesystem.writeUnlock();
}
+ // Step 2: choose target nodes for each recovery task
final Set<Node> excludedNodes = new HashSet<Node>();
- for(ReplicationWork rw : work){
+ for(BlockRecoveryWork rw : recovWork){
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
excludedNodes.clear();
@@ -1445,9 +1467,10 @@ public class BlockManager {
rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
}
+ // Step 3: add tasks to the DN
namesystem.writeLock();
try {
- for(ReplicationWork rw : work){
+ for(BlockRecoveryWork rw : recovWork){
final DatanodeStorageInfo[] targets = rw.targets;
if(targets == null || targets.length == 0){
rw.targets = null;
@@ -1486,7 +1509,7 @@ public class BlockManager {
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!blockHasEnoughRacks(block)) ) {
- if (rw.srcNode.getNetworkLocation().equals(
+ if (rw.srcNodes[0].getNetworkLocation().equals(
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
//No use continuing, unless a new rack in this case
continue;
@@ -1494,7 +1517,17 @@ public class BlockManager {
}
// Add block to the to be replicated list
- rw.srcNode.addBlockToBeReplicated(block, targets);
+ if (bc.isStriped()) {
+ assert rw instanceof ErasureCodingWork;
+ assert rw.targets.length > 0;
+ rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
+ new ExtendedBlock(namesystem.getBlockPoolId(), block),
+ rw.srcNodes, rw.targets,
+ ((ErasureCodingWork)rw).getMissingBlockIndicies());
+ }
+ else {
+ rw.srcNodes[0].addBlockToBeReplicated(block, targets);
+ }
scheduledWork++;
DatanodeStorageInfo.incrementBlocksScheduled(targets);
@@ -1518,7 +1551,7 @@ public class BlockManager {
if (blockLog.isInfoEnabled()) {
// log which blocks have been scheduled for replication
- for(ReplicationWork rw : work){
+ for(BlockRecoveryWork rw : recovWork){
DatanodeStorageInfo[] targets = rw.targets;
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
@@ -1526,7 +1559,7 @@ public class BlockManager {
targetList.append(' ');
targetList.append(targets[k].getDatanodeDescriptor());
}
- blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
+ blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
rw.block, targetList);
}
}
@@ -1610,54 +1643,65 @@ public class BlockManager {
}
/**
- * Parse the data-nodes the block belongs to and choose one,
- * which will be the replication source.
+ * Parse the data-nodes the block belongs to and choose a certain number
+ * from them to be the recovery sources.
*
* We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
* since the former do not have write traffic and hence are less busy.
* We do not use already decommissioned nodes as a source.
- * Otherwise we choose a random node among those that did not reach their
- * replication limits. However, if the replication is of the highest priority
- * and all nodes have reached their replication limits, we will choose a
- * random node despite the replication limit.
+ * Otherwise we randomly choose nodes among those that did not reach their
+ * replication limits. However, if the recovery work is of the highest
+ * priority and all nodes have reached their replication limits, we will
+ * randomly choose the desired number of nodes despite the replication limit.
*
* In addition form a list of all nodes containing the block
* and calculate its replication numbers.
*
* @param block Block for which a replication source is needed
- * @param containingNodes List to be populated with nodes found to contain the
- * given block
- * @param nodesContainingLiveReplicas List to be populated with nodes found to
- * contain live replicas of the given block
- * @param numReplicas NumberReplicas instance to be initialized with the
- * counts of live, corrupt, excess, and
- * decommissioned replicas of the given
- * block.
+ * @param containingNodes List to be populated with nodes found to contain
+ * the given block
+ * @param nodesContainingLiveReplicas List to be populated with nodes found
+ * to contain live replicas of the given
+ * block
+ * @param numReplicas NumberReplicas instance to be initialized with the
+ * counts of live, corrupt, excess, and decommissioned
+ * replicas of the given block.
+ * @param missingBlockIndices List to be populated with indices of missing
+ * blocks in a striped block group or missing
+ * replicas of a replicated block
+ * @param numSourceNodes integer specifying the number of source nodes to
+ * choose
* @param priority integer representing replication priority of the given
* block
- * @return the DatanodeDescriptor of the chosen node from which to replicate
- * the given block
- */
- @VisibleForTesting
- DatanodeDescriptor chooseSourceDatanode(Block block,
- List<DatanodeDescriptor> containingNodes,
- List<DatanodeStorageInfo> nodesContainingLiveReplicas,
- NumberReplicas numReplicas,
- int priority) {
+ * @return the array of DatanodeDescriptor of the chosen nodes from which to
+ * recover the given block
+ */
+ @VisibleForTesting
+ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
+ List<DatanodeDescriptor> containingNodes,
+ List<DatanodeStorageInfo> nodesContainingLiveReplicas,
+ NumberReplicas numReplicas,
+ List<Short> missingBlockIndices, int numSourceNodes, int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
- DatanodeDescriptor srcNode = null;
+ LinkedList<DatanodeDescriptor> srcNodes = new LinkedList<>();
int live = 0;
int decommissioned = 0;
int corrupt = 0;
int excess = 0;
-
+ missingBlockIndices.clear();
+ Set<Short> healthyIndices = new HashSet<>();
+
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+ if (block.isStriped()) {
+ healthyIndices.add((short) ((BlockInfoStriped) block).
+ getStorageBlockIndex(storage));
+ }
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
LightWeightLinkedSet<Block> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid());
- int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
+ int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt += countableReplica;
else if (node.isDecommissionInProgress() || node.isDecommissioned())
@@ -1689,21 +1733,32 @@ public class BlockManager {
if(node.isDecommissioned())
continue;
// we prefer nodes that are in DECOMMISSION_INPROGRESS state
- if(node.isDecommissionInProgress() || srcNode == null) {
- srcNode = node;
+ if(node.isDecommissionInProgress() || srcNodes.size() < numSourceNodes) {
+ srcNodes.add(node);
continue;
}
- if(srcNode.isDecommissionInProgress())
- continue;
// switch to a different node randomly
// this to prevent from deterministically selecting the same node even
// if the node failed to replicate the block on previous iterations
- if(DFSUtil.getRandom().nextBoolean())
- srcNode = node;
+ if(DFSUtil.getRandom().nextBoolean()) {
+ int pos = DFSUtil.getRandom().nextInt(numSourceNodes);
+ if(!srcNodes.get(pos).isDecommissionInProgress()) {
+ srcNodes.set(pos, node);
+ }
+ }
+ }
+ if (block.isStriped()) {
+ for (short i = 0; i < HdfsConstants.NUM_DATA_BLOCKS +
+ HdfsConstants.NUM_PARITY_BLOCKS; i++) {
+ if (!healthyIndices.contains(i)) {
+ missingBlockIndices.add(i);
+ }
+ }
}
- if(numReplicas != null)
+ if(numReplicas != null) {
numReplicas.initialize(live, decommissioned, corrupt, excess, 0);
- return srcNode;
+ }
+ return srcNodes.toArray(new DatanodeDescriptor[numSourceNodes]);
}
/**
@@ -1733,7 +1788,7 @@ public class BlockManager {
*/
}
}
-
+
/**
* StatefulBlockInfo is used to build the "toUC" list, which is a list of
* updates to the information about under-construction blocks.
@@ -3537,7 +3592,7 @@ public class BlockManager {
}
/**
- * Periodically calls computeReplicationWork().
+ * Periodically calls computeBlockRecoveryWork().
*/
private class ReplicationMonitor implements Runnable {
@@ -3595,7 +3650,7 @@ public class BlockManager {
final int nodesToProcess = (int) Math.ceil(numlive
* this.blocksInvalidateWorkPct);
- int workFound = this.computeReplicationWork(blocksToProcess);
+ int workFound = this.computeBlockRecoveryWork(blocksToProcess);
// Update counters
namesystem.writeLock();
@@ -3622,49 +3677,118 @@ public class BlockManager {
postponedMisreplicatedBlocks.clear();
postponedMisreplicatedBlocksCount.set(0);
}
-
-
- private static class ReplicationWork {
- private final BlockInfo block;
- private final BlockCollection bc;
+ /**
+ * This class is used internally by {@link this#computeRecoveryWorkForBlocks}
+ * to represent a task to recover a block through replication or erasure
+ * coding. Recovery is done by transferring data from {@link srcNodes} to
+ * {@link targets}
+ */
+ private static class BlockRecoveryWork {
+ protected final BlockInfo block;
+ protected final BlockCollection bc;
- private final DatanodeDescriptor srcNode;
- private final List<DatanodeDescriptor> containingNodes;
- private final List<DatanodeStorageInfo> liveReplicaStorages;
- private final int additionalReplRequired;
+ /**
+ * An erasure coding recovery task has multiple source nodes.
+ * A replication task only has 1 source node, stored on top of the array
+ */
+ protected final DatanodeDescriptor[] srcNodes;
+ /** Nodes containing the block; avoid them in choosing new targets */
+ protected final List<DatanodeDescriptor> containingNodes;
+ /** Required by {@link BlockPlacementPolicy#chooseTarget} */
+ protected final List<DatanodeStorageInfo> liveReplicaStorages;
+ protected final int additionalReplRequired;
- private DatanodeStorageInfo targets[];
- private final int priority;
+ protected DatanodeStorageInfo[] targets;
+ protected final int priority;
- public ReplicationWork(BlockInfo block,
+ public BlockRecoveryWork(BlockInfo block,
BlockCollection bc,
- DatanodeDescriptor srcNode,
+ DatanodeDescriptor[] srcNodes,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages,
int additionalReplRequired,
int priority) {
this.block = block;
this.bc = bc;
- this.srcNode = srcNode;
- this.srcNode.incrementPendingReplicationWithoutTargets();
+ this.srcNodes = srcNodes;
this.containingNodes = containingNodes;
this.liveReplicaStorages = liveReplicaStorages;
this.additionalReplRequired = additionalReplRequired;
this.priority = priority;
this.targets = null;
}
-
- private void chooseTargets(BlockPlacementPolicy blockplacement,
+
+ protected void chooseTargets(BlockPlacementPolicy blockplacement,
+ BlockStoragePolicySuite storagePolicySuite,
+ Set<Node> excludedNodes) {
+ }
+ }
+
+ private static class ReplicationWork extends BlockRecoveryWork {
+
+ public ReplicationWork(BlockInfo block,
+ BlockCollection bc,
+ DatanodeDescriptor[] srcNodes,
+ List<DatanodeDescriptor> containingNodes,
+ List<DatanodeStorageInfo> liveReplicaStorages,
+ int additionalReplRequired,
+ int priority) {
+ super(block, bc, srcNodes, containingNodes,
+ liveReplicaStorages, additionalReplRequired, priority);
+ LOG.debug("Creating a ReplicationWork to recover " + block);
+ }
+
+ protected void chooseTargets(BlockPlacementPolicy blockplacement,
+ BlockStoragePolicySuite storagePolicySuite,
+ Set<Node> excludedNodes) {
+ assert srcNodes.length > 0
+ : "At least 1 source node should have been selected";
+ try {
+ targets = blockplacement.chooseTarget(bc.getName(),
+ additionalReplRequired, srcNodes[0], liveReplicaStorages, false,
+ excludedNodes, block.getNumBytes(),
+ storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
+ } finally {
+ srcNodes[0].decrementPendingReplicationWithoutTargets();
+ }
+ }
+ }
+
+ private static class ErasureCodingWork extends BlockRecoveryWork {
+
+ private short[] missingBlockIndicies = null;
+
+ public ErasureCodingWork(BlockInfo block,
+ BlockCollection bc,
+ DatanodeDescriptor[] srcNodes,
+ List<DatanodeDescriptor> containingNodes,
+ List<DatanodeStorageInfo> liveReplicaStorages,
+ int additionalReplRequired,
+ int priority) {
+ super(block, bc, srcNodes, containingNodes,
+ liveReplicaStorages, additionalReplRequired, priority);
+ LOG.debug("Creating an ErasureCodingWork to recover " + block);
+ }
+
+ public short[] getMissingBlockIndicies() {
+ return missingBlockIndicies;
+ }
+
+ public void setMissingBlockIndices(short[] missingBlockIndicies) {
+ this.missingBlockIndicies = missingBlockIndicies;
+ }
+
+ protected void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes) {
try {
+ // TODO: new placement policy for EC considering multiple writers
targets = blockplacement.chooseTarget(bc.getName(),
- additionalReplRequired, srcNode, liveReplicaStorages, false,
+ additionalReplRequired, srcNodes[0], liveReplicaStorages, false,
excludedNodes, block.getNumBytes(),
storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
} finally {
- srcNode.decrementPendingReplicationWithoutTargets();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6a774f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 86f4158..c5d1f84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.Arrays;
import com.google.common.annotations.VisibleForTesting;
@@ -39,6 +40,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -76,6 +78,33 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
+ /** Block and targets pair */
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ public static class BlockECRecoveryInfo {
+ public final ExtendedBlock block;
+ public final DatanodeDescriptor[] sources;
+ public final DatanodeStorageInfo[] targets;
+ public final short[] missingBlockIndices;
+
+ BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources,
+ DatanodeStorageInfo[] targets, short[] missingBlockIndices) {
+ this.block = block;
+ this.sources = sources;
+ this.targets = targets;
+ this.missingBlockIndices = missingBlockIndices;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("BlockECRecoveryInfo(\n ").
+ append("Recovering ").append(block).
+ append(" From: ").append(Arrays.asList(sources)).
+ append(" To: ").append(Arrays.asList(targets)).append(")\n").
+ toString();
+ }
+ }
+
/** A BlockTargetPair queue. */
private static class BlockQueue<E> {
private final Queue<E> blockq = new LinkedList<E>();
@@ -196,12 +225,17 @@ public class DatanodeDescriptor extends DatanodeInfo {
private long bandwidth;
/** A queue of blocks to be replicated by this datanode */
- private final BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
+ private final BlockQueue<BlockTargetPair> replicateBlocks =
+ new BlockQueue<>();
+ /** A queue of blocks to be erasure coded by this datanode */
+ private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks =
+ new BlockQueue<>();
/** A queue of blocks to be recovered by this datanode */
- private final BlockQueue<BlockInfoContiguousUnderConstruction> recoverBlocks =
- new BlockQueue<BlockInfoContiguousUnderConstruction>();
+ private final BlockQueue<BlockInfoContiguousUnderConstruction>
+ recoverBlocks = new BlockQueue<>();
/** A set of blocks to be invalidated by this datanode */
- private final LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
+ private final LightWeightHashSet<Block> invalidateBlocks =
+ new LightWeightHashSet<>();
/* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger
@@ -326,6 +360,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.invalidateBlocks.clear();
this.recoverBlocks.clear();
this.replicateBlocks.clear();
+ this.erasurecodeBlocks.clear();
}
// pendingCached, cached, and pendingUncached are protected by the
// FSN lock.
@@ -542,6 +577,20 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
/**
+ * Store block erasure coding work.
+ */
+ void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources,
+ DatanodeStorageInfo[] targets, short[] missingBlockIndicies) {
+ assert(block != null && sources != null && sources.length > 0);
+ BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
+ missingBlockIndicies);
+ erasurecodeBlocks.offer(task);
+ BlockManager.LOG.debug("Adding block recovery task " + task +
+ "to " + getName() + ", current queue size is " +
+ erasurecodeBlocks.size());
+ }
+
+ /**
* Store block recovery work.
*/
void addBlockToBeRecovered(BlockInfoContiguousUnderConstruction block) {
@@ -573,6 +622,13 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
/**
+ * The number of work items that are pending to be replicated
+ */
+ int getNumberOfBlocksToBeErasureCoded() {
+ return erasurecodeBlocks.size();
+ }
+
+ /**
* The number of block invalidation items that are pending to
* be sent to the datanode
*/
@@ -586,6 +642,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
return replicateBlocks.poll(maxTransfers);
}
+ public List<BlockECRecoveryInfo> getErasureCodeCommand(int maxTransfers) {
+ return erasurecodeBlocks.poll(maxTransfers);
+ }
+
public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
List<BlockInfoContiguousUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
if(blocks == null)
@@ -786,6 +846,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
if (repl > 0) {
sb.append(" ").append(repl).append(" blocks to be replicated;");
}
+ int ec = erasurecodeBlocks.size();
+ if(ec > 0) {
+ sb.append(" ").append(ec).append(" blocks to be erasure coded;");
+ }
int inval = invalidateBlocks.size();
if (inval > 0) {
sb.append(" ").append(inval).append(" blocks to be invalidated;");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6a774f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 9179ff0..5c32f88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -1342,7 +1343,7 @@ public class DatanodeManager {
VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
- DatanodeDescriptor nodeinfo = null;
+ DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
@@ -1380,10 +1381,10 @@ public class DatanodeManager {
final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
final List<DatanodeStorageInfo> recoveryLocations =
- new ArrayList<DatanodeStorageInfo>(storages.length);
- for (int i = 0; i < storages.length; i++) {
- if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
- recoveryLocations.add(storages[i]);
+ new ArrayList<>(storages.length);
+ for (DatanodeStorageInfo storage : storages) {
+ if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
+ recoveryLocations.add(storage);
}
}
// If we are performing a truncate recovery than set recovery fields
@@ -1422,7 +1423,7 @@ public class DatanodeManager {
return new DatanodeCommand[] { brCommand };
}
- final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
+ final List<DatanodeCommand> cmds = new ArrayList<>();
//check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
@@ -1430,6 +1431,13 @@ public class DatanodeManager {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
+ // checking pending erasure coding tasks
+ List<BlockECRecoveryInfo> pendingECList =
+ nodeinfo.getErasureCodeCommand(maxTransfers);
+ if (pendingECList != null) {
+ cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_CODEC,
+ pendingECList));
+ }
//check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6a774f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index f4cf3ff..8b0e9a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -406,6 +406,7 @@ public class INodeFile extends INodeWithAdditionalFields
}
@Override // BlockCollection
+ // TODO: rename to reflect both replication and EC
public short getBlockReplication() {
short max = getFileReplication(CURRENT_STATE_ID);
FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature();
@@ -416,7 +417,8 @@ public class INodeFile extends INodeWithAdditionalFields
}
max = maxInSnapshot > max ? maxInSnapshot : max;
}
- return max;
+ return isStriped()?
+ HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : max;
}
/** Set the replication factor of this file. */
@@ -1039,11 +1041,12 @@ public class INodeFile extends INodeWithAdditionalFields
Arrays.asList(snapshotBlocks).contains(block);
}
- @VisibleForTesting
/**
* @return true if the file is in the striping layout.
*/
- // TODO: move erasure coding policy to file XAttr (HDFS-7337)
+ @VisibleForTesting
+ @Override
+ // TODO: move erasure coding policy to file XAttr
public boolean isStriped() {
return getStoragePolicyID() == HdfsConstants.EC_STORAGE_POLICY_ID;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6a774f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
new file mode 100644
index 0000000..f7f02fd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
+
+import java.util.Collection;
+
+/**
+ * A BlockECRecoveryCommand is an instruction to a DataNode to reconstruct a
+ * striped block group with missing blocks.
+ *
+ * Upon receiving this command, the DataNode pulls data from other DataNodes
+ * hosting blocks in this group and reconstructs the lost blocks through codec
+ * calculation.
+ *
+ * After the reconstruction, the DataNode pushes the reconstructed blocks to
+ * their final destinations if necessary (e.g., the destination is different
+ * from the reconstruction node, or multiple blocks in a group are to be
+ * reconstructed).
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockECRecoveryCommand extends DatanodeCommand {
+ final Collection<BlockECRecoveryInfo> ecTasks;
+
+ /**
+ * Create BlockECRecoveryCommand from a collection of
+ * {@link BlockECRecoveryInfo}, each representing a recovery task
+ */
+ public BlockECRecoveryCommand(int action,
+ Collection<BlockECRecoveryInfo> blockECRecoveryInfoList) {
+ super(action);
+ this.ecTasks = blockECRecoveryInfoList;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("BlockECRecoveryCommand(\n ");
+ Joiner.on("\n ").appendTo(sb, ecTasks);
+ sb.append("\n)");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6a774f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 047de56..50955db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -77,6 +77,7 @@ public interface DatanodeProtocol {
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
final static int DNA_CACHE = 9; // cache blocks
final static int DNA_UNCACHE = 10; // uncache blocks
+ final static int DNA_CODEC = 11; // uncache blocks
/**
* Register Datanode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6a774f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index ae146e6..c4d7666 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -161,7 +161,7 @@ public class BlockManagerTestUtil {
*/
public static int computeAllPendingWork(BlockManager bm) {
int work = computeInvalidationWork(bm);
- work += bm.computeReplicationWork(Integer.MAX_VALUE);
+ work += bm.computeBlockRecoveryWork(Integer.MAX_VALUE);
return work;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6a774f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index e5b9321..deb5ff0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -448,8 +448,8 @@ public class TestBlockManager {
assertEquals("Block not initially pending replication", 0,
bm.pendingReplications.getNumReplicas(block));
assertEquals(
- "computeReplicationWork should indicate replication is needed", 1,
- bm.computeReplicationWorkForBlocks(list_all));
+ "computeBlockRecoveryWork should indicate replication is needed", 1,
+ bm.computeRecoveryWorkForBlocks(list_all));
assertTrue("replication is pending after work is computed",
bm.pendingReplications.getNumReplicas(block) > 0);
@@ -503,22 +503,22 @@ public class TestBlockManager {
assertNotNull("Chooses source node for a highest-priority replication"
+ " even if all available source nodes have reached their replication"
+ " limits below the hard limit.",
- bm.chooseSourceDatanode(
- aBlock,
+ bm.chooseSourceDatanodes(
+ bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
- UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
+ null, 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
assertNull("Does not choose a source node for a less-than-highest-priority"
+ " replication since all available source nodes have reached"
+ " their replication limits.",
- bm.chooseSourceDatanode(
- aBlock,
+ bm.chooseSourceDatanodes(
+ bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
- UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
+ null, 1, UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)[0]);
// Increase the replication count to test replication count > hard limit
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
@@ -526,12 +526,12 @@ public class TestBlockManager {
assertNull("Does not choose a source node for a highest-priority"
+ " replication when all available nodes exceed the hard limit.",
- bm.chooseSourceDatanode(
- aBlock,
+ bm.chooseSourceDatanodes(
+ bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
- UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
+ null, 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c6a774f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java
new file mode 100644
index 0000000..d883c9b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME;
+import static org.junit.Assert.assertTrue;
+
+public class TestRecoverStripedBlocks {
+ private final short GROUP_SIZE =
+ HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+ private final short NUM_OF_DATANODES = GROUP_SIZE + 1;
+ private Configuration conf;
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem fs;
+ private static final int BLOCK_SIZE = 1024;
+ private HdfsAdmin dfsAdmin;
+ private FSNamesystem namesystem;
+ private Path ECFilePath;
+
+ @Before
+ public void setupCluster() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ // Large value to make sure the pending replication request can stay in
+ // DatanodeDescriptor.replicateBlocks before test timeout.
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
+ // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via
+ // chooseUnderReplicatedBlocks at once.
+ conf.setInt(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
+
+ cluster = new MiniDFSCluster.Builder(conf).
+ numDataNodes(NUM_OF_DATANODES).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
+ namesystem = cluster.getNamesystem();
+ ECFilePath = new Path("/ecfile");
+ DFSTestUtil.createFile(fs, ECFilePath, 4 * BLOCK_SIZE, GROUP_SIZE, 0);
+ dfsAdmin.setStoragePolicy(ECFilePath, EC_STORAGE_POLICY_NAME);
+ }
+
+ @Test
+ public void testMissingStripedBlock() throws Exception {
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
+ ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, ECFilePath);
+ Iterator<DatanodeStorageInfo> storageInfos =
+ bm.blocksMap.getStorages(b.getLocalBlock())
+ .iterator();
+
+ DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
+ Iterator<BlockInfo> it = firstDn.getBlockIterator();
+ int missingBlkCnt = 0;
+ while (it.hasNext()) {
+ BlockInfo blk = it.next();
+ BlockManager.LOG.debug("Block " + blk + " will be lost");
+ missingBlkCnt++;
+ }
+ BlockManager.LOG.debug("Missing in total " + missingBlkCnt + " blocks");
+
+ bm.getDatanodeManager().removeDatanode(firstDn);
+
+ bm.computeDatanodeWork();
+
+ short cnt = 0;
+ for (DataNode dn : cluster.getDataNodes()) {
+ DatanodeDescriptor dnDescriptor =
+ bm.getDatanodeManager().getDatanode(dn.getDatanodeUuid());
+ cnt += dnDescriptor.getNumberOfBlocksToBeErasureCoded();
+ }
+
+ assertTrue("Counting the number of outstanding EC tasks", cnt == missingBlkCnt);
+ }
+}