You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/02/20 04:02:32 UTC
hadoop git commit: HDFS-9818. Correctly handle EC reconstruction work
caused by not enough racks. Contributed by Jing Zhao.
Repository: hadoop
Updated Branches:
refs/heads/trunk 6eae4337d -> e54cc2931
HDFS-9818. Correctly handle EC reconstruction work caused by not enough racks. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e54cc293
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e54cc293
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e54cc293
Branch: refs/heads/trunk
Commit: e54cc2931262bf49682a8323da9811976218c03b
Parents: 6eae433
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Feb 19 19:02:23 2016 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Feb 19 19:02:23 2016 -0800
----------------------------------------------------------------------
.../hadoop/hdfs/DFSStripedInputStream.java | 7 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../server/blockmanagement/BlockManager.java | 40 ++--
.../BlockPlacementPolicyDefault.java | 5 +-
.../BlockPlacementPolicyRackFaultTolerant.java | 10 +-
.../BlockPlacementStatusDefault.java | 10 +-
.../BlockReconstructionWork.java | 5 +
.../blockmanagement/ErasureCodingWork.java | 106 ++++++++-
.../server/blockmanagement/ReplicationWork.java | 5 +
.../erasurecode/ErasureCodingWorker.java | 25 ++-
.../TestBlocksWithNotEnoughRacks.java | 12 +-
...constructStripedBlocksWithRackAwareness.java | 213 +++++++++++++++++++
12 files changed, 388 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index d15e536..3483255 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -260,7 +260,12 @@ public class DFSStripedInputStream extends DFSInputStream {
private void closeReader(BlockReaderInfo readerInfo) {
if (readerInfo != null) {
-// IOUtils.cleanup(null, readerInfo.reader);
+ if (readerInfo.reader != null) {
+ try {
+ readerInfo.reader.close();
+ } catch (Throwable ignored) {
+ }
+ }
readerInfo.skip();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index a377243..1d0379c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -432,6 +432,9 @@ Trunk (Unreleased)
HDFS-9794. Streamer threads may leak if failure happens when closing the
striped outputstream. (jing9)
+ HDFS-9818. Correctly handle EC reconstruction work caused by not enough
+ racks. (jing9)
+
BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
HDFS-7347. Configurable erasure coding policy for individual files and
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 67f5026..cc52b6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1628,7 +1628,7 @@ public class BlockManager implements BlockStatsMXBean {
for (int i = 0 ; i < liveBlockIndices.size(); i++) {
indices[i] = liveBlockIndices.get(i);
}
- return new ErasureCodingWork(block, bc, srcNodes,
+ return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
priority, indices);
} else {
@@ -1638,6 +1638,16 @@ public class BlockManager implements BlockStatsMXBean {
}
}
+ private boolean isInNewRack(DatanodeDescriptor[] srcs,
+ DatanodeDescriptor target) {
+ for (DatanodeDescriptor src : srcs) {
+ if (src.getNetworkLocation().equals(target.getNetworkLocation())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private boolean validateReconstructionWork(BlockReconstructionWork rw) {
BlockInfo block = rw.getBlock();
int priority = rw.getPriority();
@@ -1665,31 +1675,14 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo[] targets = rw.getTargets();
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!isPlacementPolicySatisfied(block)) ) {
- if (rw.getSrcNodes()[0].getNetworkLocation().equals(
- targets[0].getDatanodeDescriptor().getNetworkLocation())) {
- //No use continuing, unless a new rack in this case
+ if (!isInNewRack(rw.getSrcNodes(), targets[0].getDatanodeDescriptor())) {
+ // No use continuing, unless a new rack in this case
return false;
}
}
- // Add block to the to be reconstructed list
- if (block.isStriped()) {
- assert rw instanceof ErasureCodingWork;
- assert rw.getTargets().length > 0;
- assert pendingNum == 0 : "Should wait the previous reconstruction"
- + " to finish";
- final ErasureCodingPolicy ecPolicy =
- ((BlockInfoStriped) block).getErasureCodingPolicy();
- assert ecPolicy != null;
-
- rw.getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
- new ExtendedBlock(getBlockPoolId(), block),
- rw.getSrcNodes(), rw.getTargets(),
- ((ErasureCodingWork) rw).getLiveBlockIndicies(), ecPolicy);
- } else {
- rw.getSrcNodes()[0].addBlockToBeReplicated(block, targets);
- }
-
+ // Add block to the datanode's task list
+ rw.addTaskToDatanode();
DatanodeStorageInfo.incrementBlocksScheduled(targets);
// Move the block-replication into a "pending" state.
@@ -3973,7 +3966,8 @@ public class BlockManager implements BlockStatsMXBean {
.getPolicy(storedBlock.isStriped());
int numReplicas = storedBlock.isStriped() ? ((BlockInfoStriped) storedBlock)
.getRealDataBlockNum() : storedBlock.getReplication();
- return placementPolicy.verifyBlockPlacement(locs, numReplicas).isPlacementPolicySatisfied();
+ return placementPolicy.verifyBlockPlacement(locs, numReplicas)
+ .isPlacementPolicySatisfied();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index ee891a5..e1a47ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -901,7 +901,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
locs = DatanodeDescriptor.EMPTY_ARRAY;
if (!clusterMap.hasClusterEverBeenMultiRack()) {
// only one rack
- return new BlockPlacementStatusDefault(1, 1);
+ return new BlockPlacementStatusDefault(1, 1, 1);
}
int minRacks = 2;
minRacks = Math.min(minRacks, numberOfReplicas);
@@ -910,7 +910,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
Set<String> racks = new TreeSet<>();
for (DatanodeInfo dn : locs)
racks.add(dn.getNetworkLocation());
- return new BlockPlacementStatusDefault(racks.size(), minRacks);
+ return new BlockPlacementStatusDefault(racks.size(), minRacks,
+ clusterMap.getNumOfRacks());
}
/**
* Decide whether deleting the specified replica of the block still makes
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
index c803b97..c0d981c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java
@@ -160,14 +160,16 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
locs = DatanodeDescriptor.EMPTY_ARRAY;
if (!clusterMap.hasClusterEverBeenMultiRack()) {
// only one rack
- return new BlockPlacementStatusDefault(1, 1);
+ return new BlockPlacementStatusDefault(1, 1, 1);
}
// 1. Check that all locations are different.
// 2. Count locations on different racks.
- Set<String> racks = new TreeSet<String>();
- for (DatanodeInfo dn : locs)
+ Set<String> racks = new TreeSet<>();
+ for (DatanodeInfo dn : locs) {
racks.add(dn.getNetworkLocation());
- return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas);
+ }
+ return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas,
+ clusterMap.getNumOfRacks());
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
index 0b8b965..75bb65d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java
@@ -21,15 +21,18 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus {
private int requiredRacks = 0;
private int currentRacks = 0;
+ private final int totalRacks;
- public BlockPlacementStatusDefault(int currentRacks, int requiredRacks){
+ public BlockPlacementStatusDefault(int currentRacks, int requiredRacks,
+ int totalRacks){
this.requiredRacks = requiredRacks;
this.currentRacks = currentRacks;
+ this.totalRacks = totalRacks;
}
@Override
public boolean isPlacementPolicySatisfied() {
- return requiredRacks <= currentRacks;
+ return requiredRacks <= currentRacks || currentRacks >= totalRacks;
}
@Override
@@ -38,7 +41,8 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus {
return null;
}
return "Block should be additionally replicated on " +
- (requiredRacks - currentRacks) + " more rack(s).";
+ (requiredRacks - currentRacks) +
+ " more rack(s). Total number of racks in the cluster: " + totalRacks;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
index df9c164..c1998ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
@@ -108,4 +108,9 @@ abstract class BlockReconstructionWork {
abstract void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes);
+
+ /**
+ * add reconstruction task into a source datanode
+ */
+ abstract void addTaskToDatanode();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
index 85a25d5..38ad324 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
@@ -17,15 +17,23 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.Node;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
class ErasureCodingWork extends BlockReconstructionWork {
private final byte[] liveBlockIndicies;
+ private final String blockPoolId;
- public ErasureCodingWork(BlockInfo block,
+ public ErasureCodingWork(String blockPoolId, BlockInfo block,
BlockCollection bc,
DatanodeDescriptor[] srcNodes,
List<DatanodeDescriptor> containingNodes,
@@ -34,6 +42,7 @@ class ErasureCodingWork extends BlockReconstructionWork {
int priority, byte[] liveBlockIndicies) {
super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority);
+ this.blockPoolId = blockPoolId;
this.liveBlockIndicies = liveBlockIndicies;
BlockManager.LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
block);
@@ -47,15 +56,92 @@ class ErasureCodingWork extends BlockReconstructionWork {
void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes) {
- try {
- // TODO: new placement policy for EC considering multiple writers
- DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget(
- getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
- getLiveReplicaStorages(), false, excludedNodes,
- getBlock().getNumBytes(),
- storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
- setTargets(chosenTargets);
- } finally {
+ // TODO: new placement policy for EC considering multiple writers
+ DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget(
+ getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0],
+ getLiveReplicaStorages(), false, excludedNodes,
+ getBlock().getNumBytes(),
+ storagePolicySuite.getPolicy(getBc().getStoragePolicyID()));
+ setTargets(chosenTargets);
+ }
+
+ /**
+ * @return true if the current source nodes cover all the internal blocks.
+ * I.e., we only need to have more racks.
+ */
+ private boolean hasAllInternalBlocks() {
+ final BlockInfoStriped block = (BlockInfoStriped) getBlock();
+ if (getSrcNodes().length < block.getRealTotalBlockNum()) {
+ return false;
+ }
+ BitSet bitSet = new BitSet(block.getTotalBlockNum());
+ for (byte index : liveBlockIndicies) {
+ bitSet.set(index);
+ }
+ for (int i = 0; i < block.getRealDataBlockNum(); i++) {
+ if (!bitSet.get(i)) {
+ return false;
+ }
+ }
+ for (int i = block.getDataBlockNum(); i < block.getTotalBlockNum(); i++) {
+ if (!bitSet.get(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * We have all the internal blocks but not enough racks. Thus we do not need
+ * to do decoding but only simply make an extra copy of an internal block. In
+ * this scenario, use this method to choose the source datanode for simple
+ * replication.
+ * @return The index of the source datanode.
+ */
+ private int chooseSource4SimpleReplication() {
+ Map<String, List<Integer>> map = new HashMap<>();
+ for (int i = 0; i < getSrcNodes().length; i++) {
+ final String rack = getSrcNodes()[i].getNetworkLocation();
+ List<Integer> dnList = map.get(rack);
+ if (dnList == null) {
+ dnList = new ArrayList<>();
+ map.put(rack, dnList);
+ }
+ dnList.add(i);
+ }
+ List<Integer> max = null;
+ for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
+ if (max == null || entry.getValue().size() > max.size()) {
+ max = entry.getValue();
+ }
+ }
+ assert max != null;
+ return max.get(0);
+ }
+
+ @Override
+ void addTaskToDatanode() {
+ assert getTargets().length > 0;
+ BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
+
+ // if we already have all the internal blocks, but not enough racks,
+ // we only need to replicate one internal block to a new rack
+ if (hasAllInternalBlocks()) {
+ int sourceIndex = chooseSource4SimpleReplication();
+ final byte blockIndex = liveBlockIndicies[sourceIndex];
+ final DatanodeDescriptor source = getSrcNodes()[sourceIndex];
+ final long internBlkLen = StripedBlockUtil.getInternalBlockLength(
+ stripedBlk.getNumBytes(), stripedBlk.getCellSize(),
+ stripedBlk.getDataBlockNum(), blockIndex);
+ final Block targetBlk = new Block(
+ stripedBlk.getBlockId() + blockIndex, internBlkLen,
+ stripedBlk.getGenerationStamp());
+ source.addBlockToBeReplicated(targetBlk, getTargets());
+ } else {
+ getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
+ new ExtendedBlock(blockPoolId, stripedBlk),
+ getSrcNodes(), getTargets(), getLiveBlockIndicies(),
+ stripedBlk.getErasureCodingPolicy());
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
index b44b9b1..24601a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
@@ -53,4 +53,9 @@ class ReplicationWork extends BlockReconstructionWork {
getSrcNodes()[0].decrementPendingReplicationWithoutTargets();
}
}
+
+ @Override
+ void addTaskToDatanode() {
+ getSrcNodes()[0].addBlockToBeReplicated(getBlock(), getTargets());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index b08aa2e..1017e1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -177,8 +177,14 @@ public final class ErasureCodingWorker {
Collection<BlockECReconstructionInfo> ecTasks) {
for (BlockECReconstructionInfo reconstructionInfo : ecTasks) {
try {
- EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL
- .submit(new ReconstructAndTransferBlock(reconstructionInfo));
+ ReconstructAndTransferBlock task =
+ new ReconstructAndTransferBlock(reconstructionInfo);
+ if (task.hasValidTargets()) {
+ EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL.submit(task);
+ } else {
+ LOG.warn("No missing internal block. Skip reconstruction for task:{}",
+ reconstructionInfo);
+ }
} catch (Throwable e) {
LOG.warn("Failed to reconstruct striped block {}",
reconstructionInfo.getExtendedBlock().getLocalBlock(), e);
@@ -292,6 +298,7 @@ public final class ErasureCodingWorker {
private final CompletionService<Void> readService =
new ExecutorCompletionService<>(
EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL);
+ private final boolean hasValidTargets;
ReconstructAndTransferBlock(BlockECReconstructionInfo reconstructionInfo) {
ErasureCodingPolicy ecPolicy = reconstructionInfo
@@ -339,10 +346,14 @@ public final class ErasureCodingWorker {
seqNo4Targets[i] = 0;
}
- getTargetIndices();
+ hasValidTargets = getTargetIndices();
cachingStrategy = CachingStrategy.newDefaultStrategy();
}
+ boolean hasValidTargets() {
+ return hasValidTargets;
+ }
+
private ByteBuffer allocateBuffer(int length) {
return ByteBuffer.allocate(length);
}
@@ -505,24 +516,30 @@ public final class ErasureCodingWorker {
}
}
- private void getTargetIndices() {
+ /**
+ * @return true if there is valid target for reconstruction
+ */
+ private boolean getTargetIndices() {
BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
for (int i = 0; i < sources.length; i++) {
bitset.set(liveIndices[i]);
}
int m = 0;
int k = 0;
+ boolean hasValidTarget = false;
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
if (!bitset.get(i)) {
if (getBlockLen(blockGroup, i) > 0) {
if (m < targets.length) {
targetIndices[m++] = (short)i;
+ hasValidTarget = true;
}
} else {
zeroStripeIndices[k++] = (short)i;
}
}
}
+ return hasValidTarget;
}
/** the reading length should not exceed the length for reconstruction. */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
index ea994a2..d91155a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java
@@ -285,7 +285,7 @@ public class TestBlocksWithNotEnoughRacks {
final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
try {
- // Create a file with one block with a replication factor of 2
+ // Create a file with one block with a replication factor of 3
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
@@ -315,8 +315,9 @@ public class TestBlocksWithNotEnoughRacks {
dm.removeDatanode(dnId);
// Make sure we have enough live replicas even though we are
- // short one rack and therefore need one replica
- DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+ // short one rack. The cluster now has only 1 rack thus we just make sure
+ // we still have 3 replicas.
+ DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
}
@@ -357,9 +358,8 @@ public class TestBlocksWithNotEnoughRacks {
// The block gets re-replicated to another datanode so it has a
// sufficient # replicas, but not across racks, so there should
- // be 1 rack, and 1 needed replica (even though there are 2 hosts
- // available and only 2 replicas required).
- DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+ // be 1 rack.
+ DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
// Start the "failed" datanode, which has a replica so the block is
// now over-replicated and therefore a replica should be removed but
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
new file mode 100644
index 0000000..2164957
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS;
+
+public class TestReconstructStripedBlocksWithRackAwareness {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ TestReconstructStripedBlocksWithRackAwareness.class);
+
+ static {
+ GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
+ }
+
+ private static final String[] hosts = new String[]{"host1", "host2", "host3",
+ "host4", "host5", "host6", "host7", "host8", "host9", "host10"};
+ private static final String[] racks = new String[]{"/r1", "/r1", "/r2", "/r2",
+ "/r3", "/r3", "/r4", "/r4", "/r5", "/r6"};
+ private static final List<String> singleNodeRacks = Arrays.asList("host9", "host10");
+ private static final short blockNum = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
+
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem fs;
+ private FSNamesystem fsn;
+ private BlockManager bm;
+
+ @Before
+ public void setup() throws Exception {
+ final HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+ false);
+
+ cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
+ .numDataNodes(hosts.length).build();
+ cluster.waitActive();
+
+ fsn = cluster.getNamesystem();
+ bm = fsn.getBlockManager();
+
+ fs = cluster.getFileSystem();
+ fs.setErasureCodingPolicy(new Path("/"), null);
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * When there are all the internal blocks available but they are not placed on
+ * enough racks, NameNode should avoid normal decoding reconstruction but copy
+ * an internal block to a new rack.
+ *
+ * In this test, we first need to create a scenario that a striped block has
+ * all the internal blocks but distributed in <6 racks. Then we check if the
+ * replication monitor can correctly schedule the reconstruction work for it.
+ *
+ * For the 9 internal blocks + 5 racks setup, the test does the following:
+ * 1. create a 6 rack cluster with 10 datanodes, where there are 2 racks only
+ * containing 1 datanodes each
+ * 2. for a striped block with 9 internal blocks, there must be one internal
+ * block locating in a single-node rack. find this node and stop it
+ * 3. namenode will trigger reconstruction for the block and since the cluster
+ * has only 5 racks remaining, after the reconstruction we have 9 internal
+ * blocks distributed in 5 racks.
+ * 4. we bring the datanode back, now the cluster has 6 racks again
+ * 5. let the datanode call reportBadBlock, this will make the namenode to
+ * check if the striped block is placed in >= 6 racks, and the namenode will
+ * put the block into the under-replicated queue
+ * 6. now we can check if the replication monitor works as expected
+ */
+ @Test
+ public void testReconstructForNotEnoughRacks() throws Exception {
+ final Path file = new Path("/foo");
+ DFSTestUtil.createFile(fs, file,
+ BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L);
+ Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
+
+ final INodeFile fileNode = fsn.getFSDirectory()
+ .getINode4Write(file.toString()).asFile();
+ BlockInfoStriped blockInfo = (BlockInfoStriped) fileNode.getLastBlock();
+
+ // find the internal block located in the single node rack
+ Block internalBlock = null;
+ String hostToStop = null;
+ for (DatanodeStorageInfo storage : blockInfo.storages) {
+ if (singleNodeRacks.contains(storage.getDatanodeDescriptor().getHostName())) {
+ hostToStop = storage.getDatanodeDescriptor().getHostName();
+ internalBlock = blockInfo.getBlockOnStorage(storage);
+ }
+ }
+ Assert.assertNotNull(internalBlock);
+ Assert.assertNotNull(hostToStop);
+
+ // delete the block on the chosen datanode
+ cluster.corruptBlockOnDataNodesByDeletingBlockFile(
+ new ExtendedBlock(bm.getBlockPoolId(), internalBlock));
+
+ // stop the chosen datanode
+ MiniDFSCluster.DataNodeProperties dnProp = null;
+ for (int i = 0; i < cluster.getDataNodes().size(); i++) {
+ DataNode dn = cluster.getDataNodes().get(i);
+ if (dn.getDatanodeId().getHostName().equals(hostToStop)) {
+ dnProp = cluster.stopDataNode(i);
+ cluster.setDataNodeDead(dn.getDatanodeId());
+ LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
+ }
+ }
+ NetworkTopology topology = bm.getDatanodeManager().getNetworkTopology();
+ Assert.assertEquals(5, topology.getNumOfRacks());
+
+ // make sure the reconstruction work can finish
+ // now we have 9 internal blocks in 5 racks
+ DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);
+
+ // we now should have 9 internal blocks distributed in 5 racks
+ Set<String> rackSet = new HashSet<>();
+ for (DatanodeStorageInfo storage : blockInfo.storages) {
+ rackSet.add(storage.getDatanodeDescriptor().getNetworkLocation());
+ }
+ Assert.assertEquals(5, rackSet.size());
+
+ // restart the stopped datanode
+ cluster.restartDataNode(dnProp);
+ cluster.waitActive();
+
+ // make sure we have 6 racks again
+ topology = bm.getDatanodeManager().getNetworkTopology();
+ Assert.assertEquals(hosts.length, topology.getNumOfLeaves());
+ Assert.assertEquals(6, topology.getNumOfRacks());
+
+ // pause all the heartbeats
+ DataNode badDn = null;
+ for (DataNode dn : cluster.getDataNodes()) {
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+ if (dn.getDatanodeId().getHostName().equals(hostToStop)) {
+ badDn = dn;
+ }
+ }
+ assert badDn != null;
+ // let the DN report the bad block, so that the namenode will put the block
+ // into under-replicated queue. note that the block still has 9 internal
+ // blocks but in 5 racks
+ badDn.reportBadBlocks(new ExtendedBlock(bm.getBlockPoolId(), internalBlock));
+
+ // check if replication monitor correctly schedule the replication work
+ boolean scheduled = false;
+ for (int i = 0; i < 5; i++) { // retry 5 times
+ for (DatanodeStorageInfo storage : blockInfo.storages) {
+ if (storage != null) {
+ DatanodeDescriptor dn = storage.getDatanodeDescriptor();
+ Assert.assertEquals(0, dn.getNumberOfBlocksToBeErasureCoded());
+ if (dn.getNumberOfBlocksToBeReplicated() == 1) {
+ scheduled = true;
+ }
+ }
+ }
+ if (scheduled) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ Assert.assertTrue(scheduled);
+ }
+}