You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/12/19 00:58:02 UTC
hadoop git commit: HDFS-9173. Erasure Coding: Lease recovery for
striped file. Contributed by Walter Su and Jing Zhao.
Repository: hadoop
Updated Branches:
refs/heads/trunk 85c246604 -> 61ab0440f
HDFS-9173. Erasure Coding: Lease recovery for striped file. Contributed by Walter Su and Jing Zhao.
Change-Id: I51703a61c9d8454f883028f3f6acb5729fde1b15
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/61ab0440
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/61ab0440
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/61ab0440
Branch: refs/heads/trunk
Commit: 61ab0440f7eaff0f631cbae0378403912f88d7ad
Parents: 85c2466
Author: Zhe Zhang <zh...@apache.org>
Authored: Fri Dec 18 15:57:48 2015 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Fri Dec 18 15:57:48 2015 -0800
----------------------------------------------------------------------
.../hadoop/hdfs/DFSStripedOutputStream.java | 4 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 37 ++-
.../server/blockmanagement/DatanodeManager.java | 12 +-
.../server/datanode/BlockRecoveryWorker.java | 238 ++++++++++++++++-
.../hdfs/server/namenode/FSNamesystem.java | 58 ++---
.../server/protocol/BlockRecoveryCommand.java | 33 +++
.../hadoop-hdfs/src/main/proto/HdfsServer.proto | 6 +-
.../apache/hadoop/hdfs/StripedFileTestUtil.java | 10 +-
.../hadoop/hdfs/TestLeaseRecoveryStriped.java | 257 +++++++++++++++++++
.../hdfs/server/datanode/TestBlockRecovery.java | 34 ++-
.../TestCommitBlockSynchronization.java | 5 +-
.../namenode/TestRecoverStripedBlocks.java | 5 +-
13 files changed, 634 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 546fa92..e1ff844 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -35,6 +35,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag;
@@ -970,7 +971,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
}
}
- private void enqueueAllCurrentPackets() throws IOException {
+ @VisibleForTesting
+ void enqueueAllCurrentPackets() throws IOException {
int idx = streamers.indexOf(getCurrentStreamer());
for(int i = 0; i < streamers.size(); i++) {
final StripedDataStreamer si = setCurrentStreamer(i);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 76d5c24..2286203 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -873,6 +873,9 @@ Trunk (Unreleased)
HDFS-9373. Erasure coding: friendly log information for write operations
with some failed streamers. (Li Bo via zhz)
+ HDFS-9173. Erasure Coding: Lease recovery for striped file. (Walter Su and
+ Jing Zhao via zhz)
+
HDFS-9451. Clean up depreated umasks and related unit tests.
(Wei-Chiu Chuang via wheat9)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 9271e33..d7a793a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
@@ -359,6 +360,12 @@ public class PBHelper {
builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp());
if(b.getNewBlock() != null)
builder.setTruncateBlock(PBHelperClient.convert(b.getNewBlock()));
+ if (b instanceof RecoveringStripedBlock) {
+ RecoveringStripedBlock sb = (RecoveringStripedBlock) b;
+ builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
+ sb.getErasureCodingPolicy()));
+ builder.addAllBlockIndices(asList(sb.getBlockIndices()));
+ }
return builder.build();
}
@@ -372,6 +379,16 @@ public class PBHelper {
rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(),
b.getNewGenStamp());
}
+
+ if (b.hasEcPolicy()) {
+ List<Integer> BlockIndicesList = b.getBlockIndicesList();
+ int[] indices = new int[BlockIndicesList.size()];
+ for (int i = 0; i < BlockIndicesList.size(); i++) {
+ indices[i] = BlockIndicesList.get(i).shortValue();
+ }
+ rBlock = new RecoveringStripedBlock(rBlock, indices,
+ PBHelperClient.convertErasureCodingPolicy(b.getEcPolicy()));
+ }
return rBlock;
}
@@ -823,12 +840,20 @@ public class PBHelper {
build();
}
- private static List<Integer> convertIntArray(short[] liveBlockIndices) {
- List<Integer> liveBlockIndicesList = new ArrayList<>();
- for (short s : liveBlockIndices) {
- liveBlockIndicesList.add((int) s);
+ private static List<Integer> asList(int[] arr) {
+ List<Integer> list = new ArrayList<>(arr.length);
+ for (int s : arr) {
+ list.add(s);
+ }
+ return list;
+ }
+
+ private static List<Integer> asList(short[] arr) {
+ List<Integer> list = new ArrayList<>(arr.length);
+ for (int s : arr) {
+ list.add(s);
}
- return liveBlockIndicesList;
+ return list;
}
private static StorageTypesProto convertStorageTypesProto(
@@ -925,7 +950,7 @@ public class PBHelper {
builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
- builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
+ builder.addAllLiveBlockIndices(asList(liveBlockIndices));
builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
blockEcRecoveryInfo.getErasureCodingPolicy()));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index cdd5b9e..0c0c01a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@@ -504,6 +505,7 @@ public class DatanodeManager {
public DatanodeStorageInfo[] getDatanodeStorageInfos(
DatanodeID[] datanodeID, String[] storageIDs,
String format, Object... args) throws UnregisteredNodeException {
+ storageIDs = storageIDs == null ? new String[0] : storageIDs;
if (datanodeID.length != storageIDs.length) {
final String err = (storageIDs.length == 0?
"Missing storageIDs: It is likely that the HDFS client,"
@@ -524,9 +526,11 @@ public class DatanodeManager {
continue;
}
final DatanodeDescriptor dd = getDatanode(datanodeID[i]);
- storages[i] = dd.getStorageInfo(storageIDs[i]);
+ if (dd != null) {
+ storages[i] = dd.getStorageInfo(storageIDs[i]);
+ }
}
- return storages;
+ return storages;
}
/** Prints information about all datanodes. */
@@ -1366,6 +1370,10 @@ public class DatanodeManager {
} else {
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
uc.getBlockRecoveryId());
+ if (b.isStriped()) {
+ rBlock = new RecoveringStripedBlock(rBlock, uc.getBlockIndices(),
+ ((BlockInfoStriped) b).getErasureCodingPolicy());
+ }
}
brCommand.add(rBlock);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
index 4ad1168..db0c6ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java
@@ -17,16 +17,21 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.ipc.RemoteException;
@@ -37,7 +42,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
/**
* This class handles the block recovery work commands.
@@ -78,6 +88,10 @@ public class BlockRecoveryWorker {
newLength);
}
+ public ReplicaRecoveryInfo getReplicaRecoveryInfo(){
+ return rInfo;
+ }
+
@Override
public String toString() {
return "block:" + rInfo + " node:" + id;
@@ -294,12 +308,8 @@ public class BlockRecoveryWorker {
// we never know the actual state of the replica on failed data-nodes.
// The recovery should be started over.
if (!failedList.isEmpty()) {
- StringBuilder b = new StringBuilder();
- for(DatanodeID id : failedList) {
- b.append("\n " + id);
- }
- throw new IOException("Cannot recover " + block + ", the following "
- + failedList.size() + " data-nodes failed {" + b + "\n}");
+ throw new IOException("Cannot recover " + block
+ + ", the following datanodes failed: " + failedList);
}
// Notify the name-node about successfully recovered replicas.
@@ -323,6 +333,215 @@ public class BlockRecoveryWorker {
}
}
+ /**
+ * blk_0 blk_1 blk_2 blk_3 blk_4 blk_5 blk_6 blk_7 blk_8
+ * 64k 64k 64k 64k 64k 64k 64k 64k 64k <-- stripe_0
+ * 64k 64k 64k 64k 64k 64k 64k 64k 64k
+ * 64k 64k 64k 64k 64k 64k 64k 61k <-- startStripeIdx
+ * 64k 64k 64k 64k 64k 64k 64k
+ * 64k 64k 64k 64k 64k 64k 59k
+ * 64k 64k 64k 64k 64k 64k
+ * 64k 64k 64k 64k 64k 64k <-- last full stripe
+ * 64k 64k 13k 64k 55k 3k <-- target last stripe
+ * 64k 64k 64k 1k
+ * 64k 64k 58k
+ * 64k 64k
+ * 64k 19k
+ * 64k <-- total visible stripe
+ *
+ * Due to different speed of streamers, the internal blocks in a block group
+ * could have different lengths when the block group isn't ended normally.
+ * The purpose of this class is to recover the UnderConstruction block group,
+ * so all internal blocks end at the same stripe.
+ *
+ * The steps:
+ * 1. get all blocks lengths from DataNodes.
+ * 2. calculate safe length, which is at the target last stripe.
+ * 3. decode and feed blk_6~8, make them end at last full stripe. (the last
+ * full stripe means the last decodable stripe.)
+ * 4. encode the target last stripe, with the remaining sequential data. In
+ * this case, the sequential data is 64k+64k+13k. Feed blk_6~8 the parity cells.
+ * Overwrite the parity cell if have to.
+ * 5. truncate the stripes from visible stripe, to target last stripe.
+ * TODO: implement step 3,4
+ */
+ public class RecoveryTaskStriped {
+ private final RecoveringBlock rBlock;
+ private final ExtendedBlock block;
+ private final String bpid;
+ private final DatanodeInfo[] locs;
+ private final long recoveryId;
+
+ private final int[] blockIndices;
+ private final ErasureCodingPolicy ecPolicy;
+
+ RecoveryTaskStriped(RecoveringStripedBlock rBlock) {
+ this.rBlock = rBlock;
+ // TODO: support truncate
+ Preconditions.checkArgument(rBlock.getNewBlock() == null);
+
+ block = rBlock.getBlock();
+ bpid = block.getBlockPoolId();
+ locs = rBlock.getLocations();
+ recoveryId = rBlock.getNewGenerationStamp();
+ blockIndices = rBlock.getBlockIndices();
+ ecPolicy = rBlock.getErasureCodingPolicy();
+ }
+
+ protected void recover() throws IOException {
+ checkLocations(locs.length);
+
+ Map<Long, BlockRecord> syncBlocks = new HashMap<>(locs.length);
+ final int dataBlkNum = ecPolicy.getNumDataUnits();
+ final int totalBlkNum = dataBlkNum + ecPolicy.getNumParityUnits();
+ //check generation stamps
+ for (int i = 0; i < locs.length; i++) {
+ DatanodeID id = locs[i];
+ try {
+ DatanodeID bpReg = new DatanodeID(
+ datanode.getBPOfferService(bpid).bpRegistration);
+ InterDatanodeProtocol proxyDN = bpReg.equals(id) ?
+ datanode : DataNode.createInterDataNodeProtocolProxy(id, conf,
+ dnConf.socketTimeout, dnConf.connectToDnViaHostname);
+ ExtendedBlock internalBlk = new ExtendedBlock(block);
+ final long blockId = block.getBlockId() + blockIndices[i];
+ internalBlk.setBlockId(blockId);
+ ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN,
+ new RecoveringBlock(internalBlk, null, recoveryId));
+
+ if (info != null &&
+ info.getGenerationStamp() >= block.getGenerationStamp() &&
+ info.getNumBytes() > 0) {
+ final BlockRecord existing = syncBlocks.get(blockId);
+ if (existing == null ||
+ info.getNumBytes() > existing.rInfo.getNumBytes()) {
+ // if we have >1 replicas for the same internal block, we
+ // simply choose the one with larger length.
+ // TODO: better usage of redundant replicas
+ syncBlocks.put(blockId, new BlockRecord(id, proxyDN, info));
+ }
+ }
+ } catch (RecoveryInProgressException ripE) {
+ InterDatanodeProtocol.LOG.warn(
+ "Recovery for replica " + block + " on data-node " + id
+ + " is already in progress. Recovery id = "
+ + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
+ return;
+ } catch (IOException e) {
+ InterDatanodeProtocol.LOG.warn(
+ "Failed to obtain replica info for block (=" + block
+ + ") from datanode (=" + id + ")", e);
+ }
+ }
+ checkLocations(syncBlocks.size());
+
+ final long safeLength = getSafeLength(syncBlocks);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovering block " + block
+ + ", length=" + block.getNumBytes() + ", safeLength=" + safeLength
+ + ", syncList=" + syncBlocks);
+ }
+
+ // If some internal blocks reach the safe length, convert them to RUR
+ List<BlockRecord> rurList = new ArrayList<>(locs.length);
+ for (BlockRecord r : syncBlocks.values()) {
+ int blockIndex = (int) (r.rInfo.getBlockId() & BLOCK_GROUP_INDEX_MASK);
+ long newSize = getInternalBlockLength(safeLength, ecPolicy.getCellSize(),
+ dataBlkNum, blockIndex);
+ if (r.rInfo.getNumBytes() >= newSize) {
+ rurList.add(r);
+ }
+ }
+ assert rurList.size() >= dataBlkNum : "incorrect safe length";
+
+ // Recovery the striped block by truncating internal blocks to the safe
+ // length. Abort if there is any failure in this step.
+ truncatePartialBlock(rurList, safeLength);
+
+ // notify Namenode the new size and locations
+ final DatanodeID[] newLocs = new DatanodeID[totalBlkNum];
+ final String[] newStorages = new String[totalBlkNum];
+ for (int i = 0; i < totalBlkNum; i++) {
+ newLocs[blockIndices[i]] = DatanodeID.EMPTY_DATANODE_ID;
+ newStorages[blockIndices[i]] = "";
+ }
+ for (BlockRecord r : rurList) {
+ int index = (int) (r.rInfo.getBlockId() &
+ HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
+ newLocs[index] = r.id;
+ newStorages[index] = r.storageID;
+ }
+ ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
+ safeLength, recoveryId);
+ DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(bpid);
+ nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
+ newBlock.getNumBytes(), true, false, newLocs, newStorages);
+ }
+
+ private void truncatePartialBlock(List<BlockRecord> rurList,
+ long safeLength) throws IOException {
+ int cellSize = ecPolicy.getCellSize();
+ int dataBlkNum = ecPolicy.getNumDataUnits();
+ List<DatanodeID> failedList = new ArrayList<>();
+ for (BlockRecord r : rurList) {
+ int blockIndex = (int) (r.rInfo.getBlockId() & BLOCK_GROUP_INDEX_MASK);
+ long newSize = getInternalBlockLength(safeLength, cellSize, dataBlkNum,
+ blockIndex);
+ try {
+ r.updateReplicaUnderRecovery(bpid, recoveryId, r.rInfo.getBlockId(),
+ newSize);
+ } catch (IOException e) {
+ InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ + ", datanode=" + r.id + ")", e);
+ failedList.add(r.id);
+ }
+ }
+
+ // If any of the data-nodes failed, the recovery fails, because
+ // we never know the actual state of the replica on failed data-nodes.
+ // The recovery should be started over.
+ if (!failedList.isEmpty()) {
+ throw new IOException("Cannot recover " + block
+ + ", the following datanodes failed: " + failedList);
+ }
+ }
+
+ /**
+ * TODO: the current implementation depends on the assumption that the
+ * parity cells are only generated based on the full stripe. This is not
+ * true after we support hflush.
+ */
+ @VisibleForTesting
+ long getSafeLength(Map<Long, BlockRecord> syncBlocks) {
+ final int cellSize = ecPolicy.getCellSize();
+ final int dataBlkNum = ecPolicy.getNumDataUnits();
+ Preconditions.checkArgument(syncBlocks.size() >= dataBlkNum);
+ final int stripeSize = dataBlkNum * cellSize;
+ long[] blockLengths = new long[syncBlocks.size()];
+ int i = 0;
+ for (BlockRecord r : syncBlocks.values()) {
+ ReplicaRecoveryInfo rInfo = r.getReplicaRecoveryInfo();
+ blockLengths[i++] = rInfo.getNumBytes();
+ }
+ Arrays.sort(blockLengths);
+ // full stripe is a stripe has at least dataBlkNum full cells.
+ // lastFullStripeIdx is the index of the last full stripe.
+ int lastFullStripeIdx =
+ (int) (blockLengths[blockLengths.length - dataBlkNum] / cellSize);
+ return lastFullStripeIdx * stripeSize; // return the safeLength
+ // TODO: Include lastFullStripeIdx+1 stripe in safeLength, if there exists
+ // such a stripe (and it must be partial).
+ }
+
+ private void checkLocations(int locationCount)
+ throws IOException {
+ if (locationCount < ecPolicy.getNumDataUnits()) {
+ throw new IOException(block + " has no enough internal blocks" +
+ ", unable to start recovery. Locations=" + Arrays.asList(locs));
+ }
+ }
+ }
+
private static void logRecoverBlock(String who, RecoveringBlock rb) {
ExtendedBlock block = rb.getBlock();
DatanodeInfo[] targets = rb.getLocations();
@@ -379,8 +598,11 @@ public class BlockRecoveryWorker {
for(RecoveringBlock b : blocks) {
try {
logRecoverBlock(who, b);
- RecoveryTaskContiguous task = new RecoveryTaskContiguous(b);
- task.recover();
+ if (b.isStriped()) {
+ new RecoveryTaskStriped((RecoveringStripedBlock) b).recover();
+ } else {
+ new RecoveryTaskContiguous(b).recover();
+ }
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b25c5f7..97cb6fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -197,7 +197,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
@@ -3285,57 +3284,42 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
storedBlock.setNumBytes(newlength);
}
- // find the DatanodeDescriptor objects
- ArrayList<DatanodeDescriptor> trimmedTargets =
- new ArrayList<DatanodeDescriptor>(newtargets.length);
- ArrayList<String> trimmedStorages =
- new ArrayList<String>(newtargets.length);
- if (newtargets.length > 0) {
- for (int i = 0; i < newtargets.length; ++i) {
- // try to get targetNode
- DatanodeDescriptor targetNode =
- blockManager.getDatanodeManager().getDatanode(newtargets[i]);
- if (targetNode != null) {
- trimmedTargets.add(targetNode);
- trimmedStorages.add(newtargetstorages[i]);
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("DatanodeDescriptor (=" + newtargets[i] + ") not found");
- }
- }
- }
- if ((closeFile) && !trimmedTargets.isEmpty()) {
+ // Find the target DatanodeStorageInfos. If not found because of invalid
+ // or empty DatanodeID/StorageID, the slot of same offset in dsInfos is
+ // null
+ final DatanodeStorageInfo[] dsInfos = blockManager.getDatanodeManager().
+ getDatanodeStorageInfos(newtargets, newtargetstorages,
+ "src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d",
+ src, oldBlock, newgenerationstamp, newlength);
+
+ if (closeFile && dsInfos != null) {
// the file is getting closed. Insert block locations into blockManager.
// Otherwise fsck will report these blocks as MISSING, especially if the
// blocksReceived from Datanodes take a long time to arrive.
- for (int i = 0; i < trimmedTargets.size(); i++) {
- DatanodeStorageInfo storageInfo =
- trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
- if (storageInfo != null) {
+ for (int i = 0; i < dsInfos.length; i++) {
+ if (dsInfos[i] != null) {
if(copyTruncate) {
- storageInfo.addBlock(truncatedBlock, truncatedBlock);
+ dsInfos[i].addBlock(truncatedBlock, truncatedBlock);
} else {
- storageInfo.addBlock(storedBlock, storedBlock);
+ Block bi = new Block(storedBlock);
+ if (storedBlock.isStriped()) {
+ bi.setBlockId(bi.getBlockId() + i);
+ }
+ dsInfos[i].addBlock(storedBlock, bi);
}
}
}
}
// add pipeline locations into the INodeUnderConstruction
- DatanodeStorageInfo[] trimmedStorageInfos =
- blockManager.getDatanodeManager().getDatanodeStorageInfos(
- trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
- trimmedStorages.toArray(new String[trimmedStorages.size()]),
- "src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d",
- src, oldBlock, newgenerationstamp, newlength);
-
if(copyTruncate) {
- iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos);
+ iFile.convertLastBlockToUC(truncatedBlock, dsInfos);
} else {
- iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos);
+ iFile.convertLastBlockToUC(storedBlock, dsInfos);
if (closeFile) {
blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(),
storedBlock, oldGenerationStamp, oldNumBytes,
- trimmedStorageInfos);
+ dsInfos);
}
}
}
@@ -3343,7 +3327,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (closeFile) {
if(copyTruncate) {
closeFileCommitBlocks(src, iFile, truncatedBlock);
- if(!iFile.isBlockInLatestSnapshot((BlockInfoContiguous) storedBlock)) {
+ if(!iFile.isBlockInLatestSnapshot(storedBlock)) {
blockManager.removeBlock(storedBlock);
}
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
index 3adc85c..8dc9d39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -76,6 +77,13 @@ public class BlockRecoveryCommand extends DatanodeCommand {
this.recoveryBlock = recoveryBlock;
}
+ public RecoveringBlock(RecoveringBlock rBlock) {
+ super(rBlock.getBlock(), rBlock.getLocations(), rBlock.getStorageIDs(),
+ rBlock.getStorageTypes());
+ this.newGenerationStamp = rBlock.newGenerationStamp;
+ this.recoveryBlock = rBlock.recoveryBlock;
+ }
+
/**
* Return the new generation stamp of the block,
* which also plays role of the recovery id.
@@ -92,6 +100,31 @@ public class BlockRecoveryCommand extends DatanodeCommand {
}
}
+ public static class RecoveringStripedBlock extends RecoveringBlock {
+ private final int[] blockIndices;
+ private final ErasureCodingPolicy ecPolicy;
+
+ public RecoveringStripedBlock(RecoveringBlock rBlock, int[] blockIndices,
+ ErasureCodingPolicy ecPolicy) {
+ super(rBlock);
+ this.blockIndices = blockIndices;
+ this.ecPolicy = ecPolicy;
+ }
+
+ public int[] getBlockIndices() {
+ return blockIndices;
+ }
+
+ public ErasureCodingPolicy getErasureCodingPolicy() {
+ return ecPolicy;
+ }
+
+ @Override
+ public boolean isStriped() {
+ return true;
+ }
+ }
+
/**
* Create empty BlockRecoveryCommand.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
index 66b2a33..453ba29 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
@@ -120,6 +120,10 @@ message RecoveringBlockProto {
required uint64 newGenStamp = 1; // New genstamp post recovery
required LocatedBlockProto block = 2; // Block to be recovered
optional BlockProto truncateBlock = 3; // New block for recovery (truncate)
+
+ optional ErasureCodingPolicyProto ecPolicy = 4;
+ // block indices of striped internal blocks for each storage in LocatedBlock
+ repeated uint32 blockIndices = 5;
}
/**
@@ -195,4 +199,4 @@ message NamenodeRegistrationProto {
}
required StorageInfoProto storageInfo = 3; // Node information
optional NamenodeRoleProto role = 4 [default = NAMENODE]; // Namenode role
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 9942a2d..2f54078 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -59,13 +59,11 @@ public class StripedFileTestUtil {
public static final short NUM_DATA_BLOCKS = (short) 6;
public static final short NUM_PARITY_BLOCKS = (short) 3;
public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024;
- public static final int BLOCK_STRIPE_SIZE = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS;
-
- public static final int stripesPerBlock = 4;
- public static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
- public static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
- public static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
+ static int stripesPerBlock = 4;
+ public static int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
+ static int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
+ static int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
static byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java
new file mode 100644
index 0000000..38ee67a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java
@@ -0,0 +1,257 @@
+package org.apache.hadoop.hdfs;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public class TestLeaseRecoveryStriped {
+ public static final Log LOG = LogFactory.getLog(TestLeaseRecoveryStriped.class);
+
+ private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS;
+ private static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS;
+ private static final int CELL_SIZE = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
+ private static final int STRIPE_SIZE = NUM_DATA_BLOCKS * CELL_SIZE;
+ private static final int STRIPES_PER_BLOCK = 15;
+ private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
+ private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
+ private static final int bytesPerChecksum = 512;
+
+ static {
+ GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
+ StripedFileTestUtil.stripesPerBlock = STRIPES_PER_BLOCK;
+ StripedFileTestUtil.blockSize = BLOCK_SIZE;
+ StripedFileTestUtil.BLOCK_GROUP_SIZE = BLOCK_GROUP_SIZE;
+ }
+
+ static private final String fakeUsername = "fakeUser1";
+ static private final String fakeGroup = "supergroup";
+
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem dfs;
+ private Configuration conf;
+ private final Path dir = new Path("/" + this.getClass().getSimpleName());
+ final Path p = new Path(dir, "testfile");
+
+ @Before
+ public void setup() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+ false);
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+ final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+ cluster.waitActive();
+ dfs = cluster.getFileSystem();
+ dfs.mkdirs(dir);
+ dfs.setErasureCodingPolicy(dir, null);
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ public static final int[][][] BLOCK_LENGTHS_SUITE = {
+ {{ 11 * CELL_SIZE,10 * CELL_SIZE, 9 * CELL_SIZE,
+ 8 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE,
+ 5 * CELL_SIZE, 4 * CELL_SIZE, 3 * CELL_SIZE},
+ {36 * CELL_SIZE}},
+
+ {{ 3 * CELL_SIZE, 4 * CELL_SIZE, 5 * CELL_SIZE,
+ 6 * CELL_SIZE, 7 * CELL_SIZE, 8 * CELL_SIZE,
+ 9 * CELL_SIZE,10 * CELL_SIZE,11 * CELL_SIZE},
+ {36 * CELL_SIZE}},
+
+ {{ 11 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE,
+ 5 * CELL_SIZE, 4 * CELL_SIZE, 2 * CELL_SIZE,
+ 9 * CELL_SIZE,10 * CELL_SIZE,11 * CELL_SIZE},
+ {36 * CELL_SIZE}},
+
+ {{ 8 * CELL_SIZE + bytesPerChecksum,
+ 7 * CELL_SIZE + bytesPerChecksum * 2,
+ 6 * CELL_SIZE + bytesPerChecksum * 2,
+ 5 * CELL_SIZE - bytesPerChecksum * 3,
+ 4 * CELL_SIZE - bytesPerChecksum * 4,
+ 3 * CELL_SIZE - bytesPerChecksum * 4,
+ 9 * CELL_SIZE, 10 * CELL_SIZE, 11 * CELL_SIZE},
+ {36 * CELL_SIZE}},
+ };
+
+ @Test
+ public void testLeaseRecovery() throws Exception {
+ for(int i=0; i < BLOCK_LENGTHS_SUITE.length; i++){
+ int[] blockLengths = BLOCK_LENGTHS_SUITE[i][0];
+ int safeLength = BLOCK_LENGTHS_SUITE[i][1][0];
+ try {
+ runTest(blockLengths, safeLength);
+ } catch (Throwable e){
+ String msg = "failed testCase at i=" + i + ", blockLengths="
+ + Arrays.toString(blockLengths) + "\n"
+ + StringUtils.stringifyException(e);
+ Assert.fail(msg);
+ }
+ }
+ }
+
+ private void runTest(int[] blockLengths, int safeLength) throws Exception {
+ writePartialBlocks(blockLengths);
+ recoverLease();
+
+ List<Long> oldGS = new ArrayList<>();
+ oldGS.add(1001L);
+ StripedFileTestUtil.checkData(dfs, p, safeLength,
+ new ArrayList<DatanodeInfo>(), oldGS);
+ // After recovery, storages are reported by primary DN. we should verify
+ // storages reported by blockReport.
+ cluster.restartNameNode(true);
+ StripedFileTestUtil.checkData(dfs, p, safeLength,
+ new ArrayList<DatanodeInfo>(), oldGS);
+ }
+
+ private void writePartialBlocks(int[] blockLengths) throws Exception {
+ final FSDataOutputStream out = dfs.create(p);
+ final DFSStripedOutputStream stripedOut
+ = (DFSStripedOutputStream) out.getWrappedStream();
+ int length = (STRIPES_PER_BLOCK - 1) * STRIPE_SIZE;
+ int[] posToKill = getPosToKill(blockLengths);
+ int checkingPos = nextCheckingPos(posToKill, 0);
+ try {
+ for (int pos = 0; pos < length; pos++) {
+ out.write(StripedFileTestUtil.getByte(pos));
+ if (pos == checkingPos) {
+ for (int index : getIndexToStop(posToKill, pos)) {
+ out.flush();
+ stripedOut.enqueueAllCurrentPackets();
+ StripedDataStreamer s = stripedOut.getStripedDataStreamer(index);
+ waitStreamerAllAcked(s);
+ waitByteSent(s, blockLengths[index]);
+ stopBlockStream(s);
+ }
+ checkingPos = nextCheckingPos(posToKill, pos);
+ }
+ }
+ } finally {
+ DFSTestUtil.abortStream(stripedOut);
+ }
+ }
+
+ private int nextCheckingPos(int[] posToKill, int curPos) {
+ int checkingPos = Integer.MAX_VALUE;
+ for (int i = 0; i < posToKill.length; i++) {
+ if (posToKill[i] > curPos) {
+ checkingPos = Math.min(checkingPos, posToKill[i]);
+ }
+ }
+ return checkingPos;
+ }
+
+ private int[] getPosToKill(int[] blockLengths) {
+ int[] posToKill = new int[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS];
+ for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
+ int numStripe = (blockLengths[i] - 1) / CELL_SIZE;
+ posToKill[i] = numStripe * STRIPE_SIZE
+ + i * CELL_SIZE + blockLengths[i] % CELL_SIZE;
+ if (blockLengths[i] % CELL_SIZE == 0) {
+ posToKill[i] += CELL_SIZE;
+ }
+ }
+ for (int i = NUM_DATA_BLOCKS; i < NUM_DATA_BLOCKS+NUM_PARITY_BLOCKS; i++) {
+ Preconditions.checkArgument(blockLengths[i] % CELL_SIZE == 0);
+ int numStripe = (blockLengths[i]) / CELL_SIZE;
+ posToKill[i] = numStripe * STRIPE_SIZE;
+ }
+ return posToKill;
+ }
+
+ private List<Integer> getIndexToStop(int[] posToKill, int pos){
+ List<Integer> indices=new LinkedList<>();
+ for(int i=0;i<posToKill.length;i++){
+ if(pos==posToKill[i]){
+ indices.add(i);
+ }
+ }
+ return indices;
+ }
+
+ private void waitByteSent(final StripedDataStreamer s, final long byteSent)
+ throws Exception {
+ try {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return s.bytesSent >= byteSent;
+ }
+ }, 100, 3000);
+ } catch (TimeoutException e) {
+ throw new IOException("Timeout waiting for streamer " + s +". Sent="
+ + s.bytesSent + ", expected="+byteSent);
+ }
+ }
+
+ private void stopBlockStream(StripedDataStreamer s) throws Exception {
+ IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
+ Whitebox.setInternalState(s, "blockStream",
+ new DataOutputStream(nullOutputStream));
+ }
+
+ private void recoverLease() throws Exception {
+ final DistributedFileSystem dfs2 = (DistributedFileSystem) getFSAsAnotherUser(conf);
+ try {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ return dfs2.recoverLease(p);
+ } catch (IOException e) {
+ return false;
+ }
+ }
+ }, 5000, 24000);
+ } catch (TimeoutException e) {
+ throw new IOException("Timeout waiting for recoverLease()");
+ }
+ }
+
+ private FileSystem getFSAsAnotherUser(final Configuration c)
+ throws IOException, InterruptedException {
+ return FileSystem.get(FileSystem.getDefaultUri(c), c,
+ UserGroupInformation.createUserForTesting(fakeUsername,
+ new String[]{fakeGroup}).getUserName());
+ }
+
+ public static void waitStreamerAllAcked(DataStreamer s) throws IOException {
+ long toWaitFor = s.getLastQueuedSeqno();
+ s.waitForAckedSeqno(toWaitFor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 4e9b4f4..82f5423 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -39,7 +39,9 @@ import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -68,8 +71,10 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker.BlockRecord;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -93,6 +98,8 @@ import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
+import static org.apache.hadoop.hdfs.TestLeaseRecoveryStriped.BLOCK_LENGTHS_SUITE;
+
/**
* This tests if sync all replicas in block recovery works correctly
*/
@@ -243,8 +250,7 @@ public class TestBlockRecovery {
DatanodeInfo[] locs = new DatanodeInfo[]{
mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
- RecoveringBlock rBlock = new RecoveringBlock(block,
- locs, RECOVERY_ID);
+ RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID);
ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
BlockRecord record1 = new BlockRecord(
DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn1, replica1);
@@ -745,4 +751,28 @@ public class TestBlockRecovery {
assertTrue(exceptionThrown);
}
}
+
+ @Test
+ public void testSafeLength() throws Exception {
+ ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager
+ .getSystemDefaultPolicy();
+ RecoveringStripedBlock rBlockStriped = new RecoveringStripedBlock(rBlock,
+ new int[9], ecPolicy);
+ BlockRecoveryWorker recoveryWorker = new BlockRecoveryWorker(dn);
+ BlockRecoveryWorker.RecoveryTaskStriped recoveryTask =
+ recoveryWorker.new RecoveryTaskStriped(rBlockStriped);
+
+ for (int i = 0; i < BLOCK_LENGTHS_SUITE.length; i++) {
+ int[] blockLengths = BLOCK_LENGTHS_SUITE[i][0];
+ int safeLength = BLOCK_LENGTHS_SUITE[i][1][0];
+ Map<Long, BlockRecord> syncList = new HashMap<>();
+ for (int id = 0; id < blockLengths.length; id++) {
+ ReplicaRecoveryInfo rInfo = new ReplicaRecoveryInfo(id,
+ blockLengths[id], 0, null);
+ syncList.put((long) id, new BlockRecord(null, null, rInfo));
+ }
+ Assert.assertEquals("BLOCK_LENGTHS_SUITE[" + i + "]", safeLength,
+ recoveryTask.getSafeLength(syncList));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
index d4c5924..0460ad1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
@@ -199,14 +199,15 @@ public class TestCommitBlockSynchronization {
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[]{
new DatanodeID("0.0.0.0", "nonexistantHost", "1", 0, 0, 0, 0)};
+ String[] storageIDs = new String[]{"fake-storage-ID"};
ExtendedBlock lastBlock = new ExtendedBlock();
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, true,
- false, newTargets, null);
+ false, newTargets, storageIDs);
// Repeat the call to make sure it returns true
namesystemSpy.commitBlockSynchronization(
- lastBlock, genStamp, length, true, false, newTargets, null);
+ lastBlock, genStamp, length, true, false, newTargets, storageIDs);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61ab0440/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index 101601e..3a5c135 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -46,7 +46,6 @@ import java.util.List;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS;
-import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -183,7 +182,7 @@ public class TestRecoverStripedBlocks {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, StripedFileTestUtil.blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 2)
.build();
try {
@@ -191,7 +190,7 @@ public class TestRecoverStripedBlocks {
DistributedFileSystem fs = cluster.getFileSystem();
BlockManager bm = cluster.getNamesystem().getBlockManager();
fs.getClient().setErasureCodingPolicy("/", null);
- int fileLen = NUM_DATA_BLOCKS * blockSize;
+ int fileLen = NUM_DATA_BLOCKS * StripedFileTestUtil.blockSize;
Path p = new Path("/test2RecoveryTasksForSameBlockGroup");
final byte[] data = new byte[fileLen];
DFSTestUtil.writeFile(fs, p, data);