You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/03/30 22:35:52 UTC
hadoop git commit: HDFS-8005. Erasure Coding: simplify striped block
recovery work computation and add tests. Contributed by Jing Zhao.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7285 a1075153e -> 5ef6204c0
HDFS-8005. Erasure Coding: simplify striped block recovery work computation and add tests. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ef6204c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ef6204c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ef6204c
Branch: refs/heads/HDFS-7285
Commit: 5ef6204c01f96be6d6c93cf797330dc6eaaeac65
Parents: a107515
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Mar 30 13:35:36 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Mar 30 13:35:36 2015 -0700
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 138 +++++-------
.../blockmanagement/DatanodeDescriptor.java | 14 +-
.../hadoop/hdfs/server/namenode/INodeFile.java | 1 +
.../blockmanagement/TestBlockManager.java | 33 +--
.../TestRecoverStripedBlocks.java | 107 ----------
.../server/namenode/TestAddStripedBlocks.java | 2 +-
.../namenode/TestRecoverStripedBlocks.java | 210 +++++++++++++++++++
7 files changed, 292 insertions(+), 213 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef6204c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7e8a88c..063b396 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -538,7 +538,7 @@ public class BlockManager {
// source node returned is not used
chooseSourceDatanodes(getStoredBlock(block), containingNodes,
containingLiveReplicasNodes, numReplicas,
- new LinkedList<Short>(), 1, UnderReplicatedBlocks.LEVEL);
+ new LinkedList<Short>(), UnderReplicatedBlocks.LEVEL);
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
@@ -1376,7 +1376,7 @@ public class BlockManager {
int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes;
- BlockCollection bc = null;
+ BlockCollection bc;
int additionalReplRequired;
int scheduledWork = 0;
@@ -1404,13 +1404,10 @@ public class BlockManager {
containingNodes = new ArrayList<>();
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas();
- List<Short> missingBlockIndices = new LinkedList<>();
- DatanodeDescriptor[] srcNodes;
- int numSourceNodes = bc.isStriped() ?
- HdfsConstants.NUM_DATA_BLOCKS : 1;
- srcNodes = chooseSourceDatanodes(
- block, containingNodes, liveReplicaNodes, numReplicas,
- missingBlockIndices, numSourceNodes, priority);
+ List<Short> liveBlockIndices = new ArrayList<>();
+ final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
+ containingNodes, liveReplicaNodes, numReplicas,
+ liveBlockIndices, priority);
if(srcNodes == null || srcNodes.length == 0) {
// block can not be replicated from any node
LOG.debug("Block " + block + " cannot be recovered " +
@@ -1442,15 +1439,14 @@ public class BlockManager {
} else {
additionalReplRequired = 1; // Needed on a new rack
}
- if (bc.isStriped()) {
+ if (block.isStriped()) {
+ short[] indices = new short[liveBlockIndices.size()];
+ for (int i = 0 ; i < liveBlockIndices.size(); i++) {
+ indices[i] = liveBlockIndices.get(i);
+ }
ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
- priority);
- short[] missingBlockArray = new short[missingBlockIndices.size()];
- for (int i = 0 ; i < missingBlockIndices.size(); i++) {
- missingBlockArray[i] = missingBlockIndices.get(i);
- }
- ecw.setMissingBlockIndices(missingBlockArray);
+ priority, indices);
recovWork.add(ecw);
} else {
recovWork.add(new ReplicationWork(block, bc, srcNodes,
@@ -1530,15 +1526,14 @@ public class BlockManager {
}
// Add block to the to be replicated list
- if (bc.isStriped()) {
+ if (block.isStriped()) {
assert rw instanceof ErasureCodingWork;
assert rw.targets.length > 0;
rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(namesystem.getBlockPoolId(), block),
rw.srcNodes, rw.targets,
- ((ErasureCodingWork)rw).getMissingBlockIndicies());
- }
- else {
+ ((ErasureCodingWork) rw).liveBlockIndicies);
+ } else {
rw.srcNodes[0].addBlockToBeReplicated(block, targets);
}
scheduledWork++;
@@ -1568,9 +1563,9 @@ public class BlockManager {
DatanodeStorageInfo[] targets = rw.targets;
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
- for (int k = 0; k < targets.length; k++) {
+ for (DatanodeStorageInfo target : targets) {
targetList.append(' ');
- targetList.append(targets[k].getDatanodeDescriptor());
+ targetList.append(target.getDatanodeDescriptor());
}
blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
rw.block, targetList);
@@ -1681,11 +1676,8 @@ public class BlockManager {
* @param numReplicas NumberReplicas instance to be initialized with the
* counts of live, corrupt, excess, and decommissioned
* replicas of the given block.
- * @param missingBlockIndices List to be populated with indices of missing
- * blocks in a striped block group or missing
- * replicas of a replicated block
- * @param numSourceNodes integer specifying the number of source nodes to
- * choose
+ * @param liveBlockIndices List to be populated with indices of healthy
+ * blocks in a striped block group
* @param priority integer representing replication priority of the given
* block
* @return the array of DatanodeDescriptor of the chosen nodes from which to
@@ -1696,32 +1688,28 @@ public class BlockManager {
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas,
- List<Short> missingBlockIndices, int numSourceNodes, int priority) {
+ List<Short> liveBlockIndices, int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
- LinkedList<DatanodeDescriptor> srcNodes = new LinkedList<>();
+ List<DatanodeDescriptor> srcNodes = new ArrayList<>();
int live = 0;
int decommissioned = 0;
int corrupt = 0;
int excess = 0;
- missingBlockIndices.clear();
- Set<Short> healthyIndices = new HashSet<>();
+ liveBlockIndices.clear();
+ final boolean isStriped = block.isStriped();
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
- for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
- if (block.isStriped()) {
- healthyIndices.add((short) ((BlockInfoStriped) block).
- getStorageBlockIndex(storage));
- }
+ for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
LightWeightLinkedSet<BlockInfo> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid());
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt += countableReplica;
- else if (node.isDecommissionInProgress() || node.isDecommissioned())
+ else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
decommissioned += countableReplica;
- else if (excessBlocks != null && excessBlocks.contains(block)) {
+ } else if (excessBlocks != null && excessBlocks.contains(block)) {
excess += countableReplica;
} else {
nodesContainingLiveReplicas.add(storage);
@@ -1749,27 +1737,19 @@ public class BlockManager {
if(node.isDecommissioned())
continue;
- // We got this far, current node is a reasonable choice
- if(srcNodes.size() < numSourceNodes) {
+ if(isStriped || srcNodes.isEmpty()) {
srcNodes.add(node);
+ if (isStriped) {
+ liveBlockIndices.add((short) ((BlockInfoStriped) block).
+ getStorageBlockIndex(storage));
+ }
continue;
}
- // switch to a different node randomly
+ // for replicated block, switch to a different node randomly
// this to prevent from deterministically selecting the same node even
// if the node failed to replicate the block on previous iterations
- if(DFSUtil.getRandom().nextBoolean()) {
- int pos = DFSUtil.getRandom().nextInt(numSourceNodes);
- if(!srcNodes.get(pos).isDecommissionInProgress()) {
- srcNodes.set(pos, node);
- }
- }
- }
- if (block.isStriped()) {
- for (short i = 0; i < HdfsConstants.NUM_DATA_BLOCKS +
- HdfsConstants.NUM_PARITY_BLOCKS; i++) {
- if (!healthyIndices.contains(i)) {
- missingBlockIndices.add(i);
- }
+ if (!isStriped && DFSUtil.getRandom().nextBoolean()) {
+ srcNodes.set(0, node);
}
}
if(numReplicas != null) {
@@ -3783,25 +3763,25 @@ public class BlockManager {
* to represent a task to recover a block through replication or erasure
* coding. Recovery is done by transferring data from srcNodes to targets
*/
- private static class BlockRecoveryWork {
- protected final BlockInfo block;
- protected final BlockCollection bc;
+ private abstract static class BlockRecoveryWork {
+ final BlockInfo block;
+ final BlockCollection bc;
/**
* An erasure coding recovery task has multiple source nodes.
* A replication task only has 1 source node, stored on top of the array
*/
- protected final DatanodeDescriptor[] srcNodes;
+ final DatanodeDescriptor[] srcNodes;
/** Nodes containing the block; avoid them in choosing new targets */
- protected final List<DatanodeDescriptor> containingNodes;
+ final List<DatanodeDescriptor> containingNodes;
/** Required by {@link BlockPlacementPolicy#chooseTarget} */
- protected final List<DatanodeStorageInfo> liveReplicaStorages;
- protected final int additionalReplRequired;
+ final List<DatanodeStorageInfo> liveReplicaStorages;
+ final int additionalReplRequired;
- protected DatanodeStorageInfo[] targets;
- protected final int priority;
+ DatanodeStorageInfo[] targets;
+ final int priority;
- public BlockRecoveryWork(BlockInfo block,
+ BlockRecoveryWork(BlockInfo block,
BlockCollection bc,
DatanodeDescriptor[] srcNodes,
List<DatanodeDescriptor> containingNodes,
@@ -3818,15 +3798,13 @@ public class BlockManager {
this.targets = null;
}
- protected void chooseTargets(BlockPlacementPolicy blockplacement,
+ abstract void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
- Set<Node> excludedNodes) {
- }
+ Set<Node> excludedNodes);
}
private static class ReplicationWork extends BlockRecoveryWork {
-
- public ReplicationWork(BlockInfo block,
+ ReplicationWork(BlockInfo block,
BlockCollection bc,
DatanodeDescriptor[] srcNodes,
List<DatanodeDescriptor> containingNodes,
@@ -3838,7 +3816,8 @@ public class BlockManager {
LOG.debug("Creating a ReplicationWork to recover " + block);
}
- protected void chooseTargets(BlockPlacementPolicy blockplacement,
+ @Override
+ void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes) {
assert srcNodes.length > 0
@@ -3855,30 +3834,23 @@ public class BlockManager {
}
private static class ErasureCodingWork extends BlockRecoveryWork {
+ final short[] liveBlockIndicies;
- private short[] missingBlockIndicies = null;
-
- public ErasureCodingWork(BlockInfo block,
+ ErasureCodingWork(BlockInfo block,
BlockCollection bc,
DatanodeDescriptor[] srcNodes,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages,
int additionalReplRequired,
- int priority) {
+ int priority, short[] liveBlockIndicies) {
super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority);
+ this.liveBlockIndicies = liveBlockIndicies;
LOG.debug("Creating an ErasureCodingWork to recover " + block);
}
- public short[] getMissingBlockIndicies() {
- return missingBlockIndicies;
- }
-
- public void setMissingBlockIndices(short[] missingBlockIndicies) {
- this.missingBlockIndicies = missingBlockIndicies;
- }
-
- protected void chooseTargets(BlockPlacementPolicy blockplacement,
+ @Override
+ void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes) {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef6204c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 9f2a4de..5ae3074 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -106,14 +106,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
public final ExtendedBlock block;
public final DatanodeDescriptor[] sources;
public final DatanodeStorageInfo[] targets;
- public final short[] missingBlockIndices;
+ public final short[] liveBlockIndices;
BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources,
- DatanodeStorageInfo[] targets, short[] missingBlockIndices) {
+ DatanodeStorageInfo[] targets, short[] liveBlockIndices) {
this.block = block;
this.sources = sources;
this.targets = targets;
- this.missingBlockIndices = missingBlockIndices;
+ this.liveBlockIndices = liveBlockIndices;
}
@Override
@@ -122,6 +122,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
append("Recovering ").append(block).
append(" From: ").append(Arrays.asList(sources)).
append(" To: ").append(Arrays.asList(targets)).append(")\n").
+ append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)).
toString();
}
}
@@ -630,10 +631,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
* Store block erasure coding work.
*/
void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources,
- DatanodeStorageInfo[] targets, short[] missingBlockIndicies) {
+ DatanodeStorageInfo[] targets, short[] liveBlockIndices) {
assert(block != null && sources != null && sources.length > 0);
BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
- missingBlockIndicies);
+ liveBlockIndices);
erasurecodeBlocks.offer(task);
BlockManager.LOG.debug("Adding block recovery task " + task +
"to " + getName() + ", current queue size is " +
@@ -674,7 +675,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
/**
* The number of work items that are pending to be replicated
*/
- int getNumberOfBlocksToBeErasureCoded() {
+ @VisibleForTesting
+ public int getNumberOfBlocksToBeErasureCoded() {
return erasurecodeBlocks.size();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef6204c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 59aabe4..8f7a72c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -402,6 +402,7 @@ public class INodeFile extends INodeWithAdditionalFields
/** The same as getFileReplication(null). */
@Override // INodeFileAttributes
+ // TODO striped
public final short getFileReplication() {
return getFileReplication(CURRENT_STATE_ID);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef6204c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 43f4607..f7504ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -508,30 +508,33 @@ public class TestBlockManager {
cntNodes,
liveNodes,
new NumberReplicas(),
- new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
+ new ArrayList<Short>(),
+ UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
- assertNull("Does not choose a source node for a less-than-highest-priority"
- + " replication since all available source nodes have reached"
- + " their replication limits.",
+ assertEquals("Does not choose a source node for a less-than-highest-priority"
+ + " replication since all available source nodes have reached"
+ + " their replication limits.", 0,
bm.chooseSourceDatanodes(
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
- new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)[0]);
+ new ArrayList<Short>(),
+ UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length);
// Increase the replication count to test replication count > hard limit
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
- assertNull("Does not choose a source node for a highest-priority"
- + " replication when all available nodes exceed the hard limit.",
+ assertEquals("Does not choose a source node for a highest-priority"
+ + " replication when all available nodes exceed the hard limit.", 0,
bm.chooseSourceDatanodes(
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
- new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
+ new ArrayList<Short>(),
+ UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length);
}
@Test
@@ -556,26 +559,24 @@ public class TestBlockManager {
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
- new NumberReplicas(), new LinkedList<Short>(), 1,
- UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED));
+ new NumberReplicas(), new LinkedList<Short>(),
+ UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]);
// Increase the replication count to test replication count > hard limit
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
- assertNull("Does not choose a source decommissioning node for a normal"
- + " replication when all available nodes exceed the hard limit.",
+ assertEquals("Does not choose a source decommissioning node for a normal"
+ + " replication when all available nodes exceed the hard limit.", 0,
bm.chooseSourceDatanodes(
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
- new NumberReplicas(), new LinkedList<Short>(), 1,
- UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED));
+ new NumberReplicas(), new LinkedList<Short>(),
+ UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length);
}
-
-
@Test
public void testSafeModeIBR() throws Exception {
DatanodeDescriptor node = spy(nodes.get(0));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef6204c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java
deleted file mode 100644
index d883c9b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.client.HdfsAdmin;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME;
-import static org.junit.Assert.assertTrue;
-
-public class TestRecoverStripedBlocks {
- private final short GROUP_SIZE =
- HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
- private final short NUM_OF_DATANODES = GROUP_SIZE + 1;
- private Configuration conf;
- private MiniDFSCluster cluster;
- private DistributedFileSystem fs;
- private static final int BLOCK_SIZE = 1024;
- private HdfsAdmin dfsAdmin;
- private FSNamesystem namesystem;
- private Path ECFilePath;
-
- @Before
- public void setupCluster() throws IOException {
- conf = new HdfsConfiguration();
- conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
- // Large value to make sure the pending replication request can stay in
- // DatanodeDescriptor.replicateBlocks before test timeout.
- conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
- // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via
- // chooseUnderReplicatedBlocks at once.
- conf.setInt(
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
-
- cluster = new MiniDFSCluster.Builder(conf).
- numDataNodes(NUM_OF_DATANODES).build();
- cluster.waitActive();
- fs = cluster.getFileSystem();
- dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
- namesystem = cluster.getNamesystem();
- ECFilePath = new Path("/ecfile");
- DFSTestUtil.createFile(fs, ECFilePath, 4 * BLOCK_SIZE, GROUP_SIZE, 0);
- dfsAdmin.setStoragePolicy(ECFilePath, EC_STORAGE_POLICY_NAME);
- }
-
- @Test
- public void testMissingStripedBlock() throws Exception {
- final BlockManager bm = cluster.getNamesystem().getBlockManager();
- ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, ECFilePath);
- Iterator<DatanodeStorageInfo> storageInfos =
- bm.blocksMap.getStorages(b.getLocalBlock())
- .iterator();
-
- DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
- Iterator<BlockInfo> it = firstDn.getBlockIterator();
- int missingBlkCnt = 0;
- while (it.hasNext()) {
- BlockInfo blk = it.next();
- BlockManager.LOG.debug("Block " + blk + " will be lost");
- missingBlkCnt++;
- }
- BlockManager.LOG.debug("Missing in total " + missingBlkCnt + " blocks");
-
- bm.getDatanodeManager().removeDatanode(firstDn);
-
- bm.computeDatanodeWork();
-
- short cnt = 0;
- for (DataNode dn : cluster.getDataNodes()) {
- DatanodeDescriptor dnDescriptor =
- bm.getDatanodeManager().getDatanode(dn.getDatanodeUuid());
- cnt += dnDescriptor.getNumberOfBlocksToBeErasureCoded();
- }
-
- assertTrue("Counting the number of outstanding EC tasks", cnt == missingBlkCnt);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef6204c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index 7d7c81e..215a4e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -224,7 +224,7 @@ public class TestAddStripedBlocks {
int i = 0;
for (DataNode dn : cluster.getDataNodes()) {
final Block block = new Block(lastBlock.getBlockId() + i++,
- lastBlock.getGenerationStamp(), 0);
+ 0, lastBlock.getGenerationStamp());
DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
storageIDs.add(storage.getStorageID());
StorageReceivedDeletedBlocks[] reports = DFSTestUtil
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ef6204c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
new file mode 100644
index 0000000..b9fd4fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestRecoverStripedBlocks {
+ private final short GROUP_SIZE =
+ NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+ private MiniDFSCluster cluster;
+ private final Path dirPath = new Path("/dir");
+ private Path filePath = new Path(dirPath, "file");
+
+ @Before
+ public void setup() throws IOException {
+ final Configuration conf = new HdfsConfiguration();
+ // Large value to make sure the pending replication request can stay in
+ // DatanodeDescriptor.replicateBlocks before test timeout.
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
+ // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via
+ // chooseUnderReplicatedBlocks at once.
+ conf.setInt(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1)
+ .build();
+ cluster.waitActive();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
+ int numBlocks) throws Exception {
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ dfs.mkdirs(dir);
+ dfs.setStoragePolicy(dir, EC_STORAGE_POLICY_NAME);
+
+ FSDataOutputStream out = null;
+ try {
+ out = dfs.create(file, (short) 1); // create an empty file
+
+ FSNamesystem ns = cluster.getNamesystem();
+ FSDirectory fsdir = ns.getFSDirectory();
+ INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
+
+ ExtendedBlock previous = null;
+ for (int i = 0; i < numBlocks; i++) {
+ Block newBlock = createBlock(cluster.getDataNodes(), ns,
+ file.toString(), fileNode, dfs.getClient().getClientName(),
+ previous);
+ previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
+ }
+
+ ns.completeFile(file.toString(), dfs.getClient().getClientName(),
+ previous, fileNode.getId());
+ } finally {
+ IOUtils.cleanup(null, out);
+ }
+ }
+
+ static Block createBlock(List<DataNode> dataNodes, FSNamesystem ns,
+ String file, INodeFile fileNode, String clientName,
+ ExtendedBlock previous) throws Exception {
+ ns.getAdditionalBlock(file, fileNode.getId(), clientName, previous, null,
+ null);
+
+ final BlockInfo lastBlock = fileNode.getLastBlock();
+ final int groupSize = fileNode.getBlockReplication();
+ // 1. RECEIVING_BLOCK IBR
+ int i = 0;
+ for (DataNode dn : dataNodes) {
+ if (i < groupSize) {
+ final Block block = new Block(lastBlock.getBlockId() + i++, 0,
+ lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+ }
+
+ // 2. RECEIVED_BLOCK IBR
+ i = 0;
+ for (DataNode dn : dataNodes) {
+ if (i < groupSize) {
+ final Block block = new Block(lastBlock.getBlockId() + i++,
+ BLOCK_STRIPED_CHUNK_SIZE, lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+ }
+
+ lastBlock.setNumBytes(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS);
+ return lastBlock;
+ }
+
+ @Test
+ public void testMissingStripedBlock() throws Exception {
+ final int numBlocks = 4;
+ createECFile(cluster, filePath, dirPath, numBlocks);
+
+ // make sure the file is complete in NN
+ final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
+ .getINode4Write(filePath.toString()).asFile();
+ assertFalse(fileNode.isUnderConstruction());
+ assertTrue(fileNode.isWithStripedBlocks());
+ BlockInfo[] blocks = fileNode.getBlocks();
+ assertEquals(numBlocks, blocks.length);
+ for (BlockInfo blk : blocks) {
+ assertTrue(blk.isStriped());
+ assertTrue(blk.isComplete());
+ assertEquals(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
+ final BlockInfoStriped sb = (BlockInfoStriped) blk;
+ assertEquals(GROUP_SIZE, sb.numNodes());
+ }
+
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
+ BlockInfo firstBlock = fileNode.getBlocks()[0];
+ DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
+
+ DatanodeDescriptor secondDn = storageInfos[1].getDatanodeDescriptor();
+ assertEquals(numBlocks, secondDn.numBlocks());
+
+ bm.getDatanodeManager().removeDatanode(secondDn);
+
+ BlockManagerTestUtil.getComputedDatanodeWork(bm);
+
+ // all the recovery work will be scheduled on the last DN
+ DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE);
+ DatanodeDescriptor last =
+ bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId());
+ assertEquals("Counting the number of outstanding EC tasks", numBlocks,
+ last.getNumberOfBlocksToBeErasureCoded());
+ List<BlockECRecoveryInfo> recovery = last.getErasureCodeCommand(numBlocks);
+ for (BlockECRecoveryInfo info : recovery) {
+ assertEquals(1, info.targets.length);
+ assertEquals(last, info.targets[0].getDatanodeDescriptor());
+ assertEquals(GROUP_SIZE - 1, info.sources.length);
+ assertEquals(GROUP_SIZE - 1, info.liveBlockIndices.length);
+ }
+ }
+}