You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/19 07:15:45 UTC
[41/50] hadoop git commit: HDFS-7678. Erasure coding: DFSInputStream
with decode functionality (pread). Contributed by Zhe Zhang.
HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread). Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a265c211
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a265c211
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a265c211
Branch: refs/heads/HDFS-7285
Commit: a265c2111b0b6d6241478fb14f6909422f6cb5c6
Parents: e2c1d18
Author: Zhe Zhang <zh...@apache.org>
Authored: Mon May 11 21:10:23 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon May 18 22:11:10 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../hadoop/hdfs/DFSStripedInputStream.java | 164 ++++--
.../erasurecode/ErasureCodingWorker.java | 10 +-
.../hadoop/hdfs/util/StripedBlockUtil.java | 517 +++++++++++++++++--
.../hadoop/hdfs/TestDFSStripedInputStream.java | 97 +++-
.../hadoop/hdfs/TestWriteReadStripedFile.java | 49 ++
6 files changed, 768 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a265c211/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index c7d01c7..0acf746 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -195,3 +195,6 @@
HDFS-8355. Erasure Coding: Refactor BlockInfo and BlockInfoUnderConstruction.
(Tsz Wo Nicholas Sze via jing9)
+
+ HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread).
+ (Zhe Zhang)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a265c211/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 7425e75..7678fae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -21,15 +21,27 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.erasurecode.ECSchema;
+
import org.apache.hadoop.net.NetUtils;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@@ -37,10 +49,12 @@ import org.apache.htrace.TraceScope;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.Set;
+import java.util.Collection;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CompletionService;
@@ -51,7 +65,6 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
-
/******************************************************************************
* DFSStripedInputStream reads from striped block groups, illustrated below:
*
@@ -125,6 +138,7 @@ public class DFSStripedInputStream extends DFSInputStream {
private final short parityBlkNum;
/** the buffer for a complete stripe */
private ByteBuffer curStripeBuf;
+ private final ECSchema schema;
/**
* indicate the start/end offset of the current buffered stripe in the
* block group
@@ -137,6 +151,7 @@ public class DFSStripedInputStream extends DFSInputStream {
super(dfsClient, src, verifyChecksum);
assert schema != null;
+ this.schema = schema;
cellSize = schema.getChunkSize();
dataBlkNum = (short) schema.getNumDataUnits();
parityBlkNum = (short) schema.getNumParityUnits();
@@ -472,12 +487,10 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
@Override
protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
- LocatedBlock lb = super.getBlockAt(blkStartOffset);
- assert lb instanceof LocatedStripedBlock : "NameNode should return a " +
- "LocatedStripedBlock for a striped file";
+ LocatedBlock lb = getBlockGroupAt(blkStartOffset);
- int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
- % dataBlkNum);
+ int idx = (int) ((blkStartOffset - lb.getStartOffset())
+ % (dataBlkNum + parityBlkNum));
// If indexing information is returned, iterate through the index array
// to find the entry for position idx in the group
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
@@ -509,48 +522,121 @@ public class DFSStripedInputStream extends DFSInputStream {
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
- Map<Future<Void>, Integer> futures = new HashMap<>();
- CompletionService<Void> stripedReadsService =
- new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
- int len = (int) (end - start + 1);
-
// Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);
+ AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, blockGroup,
+ start, end, buf, offset);
+ for (AlignedStripe stripe : stripes) {
+ fetchOneStripe(blockGroup, buf, stripe, corruptedBlockMap);
+ }
+ }
- // Planning the portion of I/O for each shard
- ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start,
- len, offset);
-
+ private void fetchOneStripe(LocatedStripedBlock blockGroup,
+ byte[] buf, AlignedStripe alignedStripe, Map<ExtendedBlock,
+ Set<DatanodeInfo>> corruptedBlockMap) throws IOException {
+ Map<Future<Void>, Integer> futures = new HashMap<>();
+ CompletionService<Void> service =
+ new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
+ if (alignedStripe.getSpanInBlock() == 0) {
+ DFSClient.LOG.warn("Trying to read an empty stripe from" + blockGroup);
+ return;
+ }
// Parse group to get chosen DN location
LocatedBlock[] blks = StripedBlockUtil.
parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum);
-
for (short i = 0; i < dataBlkNum; i++) {
- ReadPortion rp = readPortions[i];
- if (rp.getReadLength() <= 0) {
- continue;
+ if (alignedStripe.chunks[i] != null
+ && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+ fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i,
+ corruptedBlockMap);
}
- DatanodeInfo loc = blks[i].getLocations()[0];
- StorageType type = blks[i].getStorageTypes()[0];
- DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
- loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
- type);
- Callable<Void> readCallable = getFromOneDataNode(dnAddr,
- blks[i].getStartOffset(), rp.getStartOffsetInBlock(),
- rp.getStartOffsetInBlock() + rp.getReadLength() - 1, buf,
- rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
- Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
- DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
- futures.put(getFromDNRequest, (int) i);
}
+ // Input buffers for potential decode operation, which remains null until
+ // first read failure
+ byte[][] decodeInputs = null;
while (!futures.isEmpty()) {
try {
- waitNextCompletion(stripedReadsService, futures);
+ StripingChunkReadResult r = getNextCompletedStripedRead(
+ service, futures, 0);
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + alignedStripe);
+ }
+ StripingChunk returnedChunk = alignedStripe.chunks[r.index];
+ Preconditions.checkNotNull(returnedChunk);
+ Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
+ if (r.state == StripingChunkReadResult.SUCCESSFUL) {
+ returnedChunk.state = StripingChunk.FETCHED;
+ alignedStripe.fetchedChunksNum++;
+ if (alignedStripe.fetchedChunksNum == dataBlkNum) {
+ clearFutures(futures.keySet());
+ break;
+ }
+ } else {
+ returnedChunk.state = StripingChunk.MISSING;
+ alignedStripe.missingChunksNum++;
+ if (alignedStripe.missingChunksNum > parityBlkNum) {
+ clearFutures(futures.keySet());
+ throw new IOException("Too many blocks are missing: " + alignedStripe);
+ }
+ // When seeing first missing block, initialize decode input buffers
+ if (decodeInputs == null) {
+ decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum);
+ }
+ for (int i = 0; i < alignedStripe.chunks.length; i++) {
+ StripingChunk chunk = alignedStripe.chunks[i];
+ Preconditions.checkNotNull(chunk);
+ if (chunk.state == StripingChunk.REQUESTED && i <= dataBlkNum) {
+ fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i,
+ corruptedBlockMap);
+ }
+ }
+ }
} catch (InterruptedException ie) {
- // Ignore and retry
+ String err = "Read request interrupted";
+ DFSClient.LOG.error(err);
+ clearFutures(futures.keySet());
+ // Don't decode if read interrupted
+ throw new InterruptedIOException(err);
}
}
+
+ if (alignedStripe.missingChunksNum > 0) {
+ decodeAndFillBuffer(decodeInputs, buf, alignedStripe,
+ dataBlkNum, parityBlkNum);
+ }
+ }
+
+ /**
+ * Schedule a single read request to an internal block
+ * @param block The internal block
+ * @param index Index of the internal block in the group
+ * @param corruptedBlockMap Map of corrupted blocks
+ */
+ private void fetchOneStripingChunk(Map<Future<Void>, Integer> futures,
+ final CompletionService<Void> service, final LocatedBlock block,
+ final AlignedStripe alignedStripe, final int index,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+ DatanodeInfo loc = block.getLocations()[0];
+ StorageType type = block.getStorageTypes()[0];
+ DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
+ loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
+ type);
+ StripingChunk chunk = alignedStripe.chunks[index];
+ chunk.state = StripingChunk.PENDING;
+ Callable<Void> readCallable = getFromOneDataNode(dnAddr,
+ block.getStartOffset(), alignedStripe.getOffsetInBlock(),
+ alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1, chunk.buf,
+ chunk.getOffsets(), chunk.getLengths(),
+ corruptedBlockMap, index);
+ Future<Void> getFromDNRequest = service.submit(readCallable);
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Submitting striped read request for " + index +
+ ". Info of the block: " + block + ", offset in block is " +
+ alignedStripe.getOffsetInBlock() + ", end is " +
+ (alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1));
+ }
+ futures.put(getFromDNRequest, index);
}
private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
@@ -609,4 +695,12 @@ public class DFSStripedInputStream extends DFSInputStream {
throw new UnsupportedOperationException(
"Not support enhanced byte buffer access.");
}
+
+ /** A variation to {@link DFSInputStream#cancelAll} */
+ private void clearFutures(Collection<Future<Void>> futures) {
+ for (Future<Void> future : futures) {
+ future.cancel(false);
+ }
+ futures.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a265c211/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 5ede508..eedb191 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripedReadResult;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
@@ -462,10 +462,10 @@ public final class ErasureCodingWorker {
int nsuccess = 0;
while (!futures.isEmpty()) {
try {
- StripedReadResult result =
+ StripingChunkReadResult result =
StripedBlockUtil.getNextCompletedStripedRead(
readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
- if (result.state == StripedReadResult.SUCCESSFUL) {
+ if (result.state == StripingChunkReadResult.SUCCESSFUL) {
success[nsuccess++] = result.index;
if (nsuccess >= dataBlkNum) {
// cancel remaining reads if we read successfully from minimum
@@ -474,14 +474,14 @@ public final class ErasureCodingWorker {
futures.clear();
break;
}
- } else if (result.state == StripedReadResult.FAILED) {
+ } else if (result.state == StripingChunkReadResult.FAILED) {
// If read failed for some source, we should not use it anymore
// and schedule read from a new source.
StripedReader failedReader = stripedReaders.get(result.index);
closeBlockReader(failedReader.blockReader);
failedReader.blockReader = null;
scheduleNewRead(used);
- } else if (result.state == StripedReadResult.TIMEOUT) {
+ } else if (result.state == StripingChunkReadResult.TIMEOUT) {
// If timeout, we also schedule a new read.
scheduleNewRead(used);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a265c211/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 45bbf6b..f7ae88a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -22,16 +22,18 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSStripedOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@@ -85,7 +87,7 @@ public class StripedBlockUtil {
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
- bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
+ bg.getStartOffset() + idxInBlockGroup, bg.isCorrupt(),
null);
}
@@ -238,33 +240,37 @@ public class StripedBlockUtil {
/**
* Get the next completed striped read task
*
- * @return {@link StripedReadResult} indicating the status of the read task
+ * @return {@link StripingChunkReadResult} indicating the status of the read task
* succeeded, and the block index of the task. If the method times
* out without getting any completed read tasks, -1 is returned as
* block index.
* @throws InterruptedException
*/
- public static StripedReadResult getNextCompletedStripedRead(
+ public static StripingChunkReadResult getNextCompletedStripedRead(
CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
final long threshold) throws InterruptedException {
Preconditions.checkArgument(!futures.isEmpty());
- Preconditions.checkArgument(threshold > 0);
Future<Void> future = null;
try {
- future = readService.poll(threshold, TimeUnit.MILLISECONDS);
+ if (threshold > 0) {
+ future = readService.poll(threshold, TimeUnit.MILLISECONDS);
+ } else {
+ future = readService.take();
+ }
if (future != null) {
future.get();
- return new StripedReadResult(futures.remove(future),
- StripedReadResult.SUCCESSFUL);
+ return new StripingChunkReadResult(futures.remove(future),
+ StripingChunkReadResult.SUCCESSFUL);
} else {
- return new StripedReadResult(StripedReadResult.TIMEOUT);
+ return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
}
} catch (ExecutionException e) {
- return new StripedReadResult(futures.remove(future),
- StripedReadResult.FAILED);
+ DFSClient.LOG.error("ExecutionException " + e);
+ return new StripingChunkReadResult(futures.remove(future),
+ StripingChunkReadResult.FAILED);
} catch (CancellationException e) {
- return new StripedReadResult(futures.remove(future),
- StripedReadResult.CANCELLED);
+ return new StripingChunkReadResult(futures.remove(future),
+ StripingChunkReadResult.CANCELLED);
}
}
@@ -291,26 +297,247 @@ public class StripedBlockUtil {
}
/**
- * This class represents the portion of I/O associated with each block in the
- * striped block group.
+ * Initialize the decoding input buffers based on the chunk states in an
+ * AlignedStripe
*/
- public static class ReadPortion {
+ public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
+ int dataBlkNum, int parityBlkNum) {
+ byte[][] decodeInputs =
+ new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()];
+ for (int i = 0; i < alignedStripe.chunks.length; i++) {
+ StripingChunk chunk = alignedStripe.chunks[i];
+ if (chunk == null) {
+ alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
+ alignedStripe.chunks[i].offsetsInBuf.add(0);
+ alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock());
+ } else if (chunk.state == StripingChunk.FETCHED) {
+ int posInBuf = 0;
+ for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
+ System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j),
+ decodeInputs[i], posInBuf, chunk.lengthsInBuf.get(j));
+ posInBuf += chunk.lengthsInBuf.get(j);
+ }
+ } else if (chunk.state == StripingChunk.ALLZERO) {
+ Arrays.fill(decodeInputs[i], (byte)0);
+ }
+ }
+ return decodeInputs;
+ }
+
+ /**
+ * Decode based on the given input buffers and schema
+ */
+ public static void decodeAndFillBuffer(final byte[][] decodeInputs, byte[] buf,
+ AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum) {
+ int[] decodeIndices = new int[parityBlkNum];
+ int pos = 0;
+ for (int i = 0; i < alignedStripe.chunks.length; i++) {
+ if (alignedStripe.chunks[i].state != StripingChunk.FETCHED &&
+ alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+ decodeIndices[pos++] = i;
+ }
+ }
+
+ byte[][] outputs = new byte[parityBlkNum][(int) alignedStripe.getSpanInBlock()];
+ RSRawDecoder rsRawDecoder = new RSRawDecoder();
+ rsRawDecoder.initialize(dataBlkNum, parityBlkNum, (int) alignedStripe.getSpanInBlock());
+ rsRawDecoder.decode(decodeInputs, decodeIndices, outputs);
+
+ for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+ StripingChunk chunk = alignedStripe.chunks[i];
+ if (chunk.state == StripingChunk.MISSING) {
+ int srcPos = 0;
+ for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
+ //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
+// System.arraycopy(outputs[i], srcPos, buf, chunk.offsetsInBuf.get(j),
+// chunk.lengthsInBuf.get(j));
+ Arrays.fill(buf, chunk.offsetsInBuf.get(j),
+ chunk.offsetsInBuf.get(j) + chunk.lengthsInBuf.get(j), (byte)7);
+ srcPos += chunk.lengthsInBuf.get(j);
+ }
+ }
+ }
+ }
+
+ /**
+ * This method divides a requested byte range into an array of
+ * {@link AlignedStripe}
+ *
+ *
+ * At most 5 stripes will be generated from each logical range
+ * TODO: cleanup and get rid of planReadPortions
+ */
+ public static AlignedStripe[] divideByteRangeIntoStripes (
+ ECSchema ecSchema, LocatedStripedBlock blockGroup, long start, long end,
+ byte[] buf, int offsetInBuf) {
+ // TODO: change ECSchema naming to use cell size instead of chunk size
+
+ // Step 0: analyze range and calculate basic parameters
+ int cellSize = ecSchema.getChunkSize();
+ int dataBlkNum = ecSchema.getNumDataUnits();
+ int len = (int) (end - start + 1);
+ int firstCellIdxInBG = (int) (start / cellSize);
+ int lastCellIdxInBG = (int) (end / cellSize);
+ int firstCellSize = Math.min(cellSize - (int) (start % cellSize), len);
+ long firstCellOffsetInBlk = start % cellSize;
+ int lastCellSize = lastCellIdxInBG == firstCellIdxInBG ?
+ firstCellSize : (int) (end % cellSize) + 1;
+
+ // Step 1: get the unmerged ranges on each internal block
+ // TODO: StripingCell should carry info on size and start offset (HDFS-8320)
+ VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema,
+ firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk,
+ lastCellSize);
+
+ // Step 2: merge into at most 5 stripes
+ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
+
+ // Step 3: calculate each chunk's position in destination buffer
+ calcualteChunkPositionsInBuf(ecSchema, blockGroup, buf, offsetInBuf,
+ firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk,
+ lastCellSize, stripes);
+
+ // Step 4: prepare ALLZERO blocks
+ prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum);
+
+ return stripes;
+ }
+
+ private static VerticalRange[] getRangesForInternalBlocks (ECSchema ecSchema,
+ int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize,
+ long firstCellOffsetInBlk, int lastCellSize) {
+ int cellSize = ecSchema.getChunkSize();
+ int dataBlkNum = ecSchema.getNumDataUnits();
+
+ StripingCell firstCell = new StripingCell(ecSchema, firstCellIdxInBG);
+ StripingCell lastCell = new StripingCell(ecSchema, lastCellIdxInBG);
+
+ VerticalRange ranges[] = new VerticalRange[dataBlkNum];
+ ranges[firstCell.idxInStripe] =
+ new VerticalRange(firstCellOffsetInBlk, firstCellSize);
+ for (int i = firstCellIdxInBG + 1; i < lastCellIdxInBG; i++) {
+ // iterate through all cells and update the list of StripeRanges
+ StripingCell cell = new StripingCell(ecSchema, i);
+ if (ranges[cell.idxInStripe] == null) {
+ ranges[cell.idxInStripe] = new VerticalRange(
+ cell.idxInInternalBlk * cellSize, cellSize);
+ } else {
+ ranges[cell.idxInStripe].spanInBlock += cellSize;
+ }
+ }
+ if (ranges[lastCell.idxInStripe] == null) {
+ ranges[lastCell.idxInStripe] = new VerticalRange(
+ lastCell.idxInInternalBlk * cellSize, lastCellSize);
+ } else if (lastCell.idxInBlkGroup != firstCell.idxInBlkGroup) {
+ ranges[lastCell.idxInStripe].spanInBlock += lastCellSize;
+ }
+
+ return ranges;
+ }
+
+ private static AlignedStripe[] mergeRangesForInternalBlocks(ECSchema ecSchema,
+ VerticalRange[] ranges) {
+ int dataBlkNum = ecSchema.getNumDataUnits();
+ int parityBlkNum = ecSchema.getNumParityUnits();
+ List<AlignedStripe> stripes = new ArrayList<>();
+ SortedSet<Long> stripePoints = new TreeSet<>();
+ for (VerticalRange r : ranges) {
+ if (r != null) {
+ stripePoints.add(r.offsetInBlock);
+ stripePoints.add(r.offsetInBlock + r.spanInBlock);
+ }
+ }
+
+ long prev = -1;
+ for (long point : stripePoints) {
+ if (prev >= 0) {
+ stripes.add(new AlignedStripe(prev, point - prev,
+ dataBlkNum + parityBlkNum));
+ }
+ prev = point;
+ }
+ return stripes.toArray(new AlignedStripe[stripes.size()]);
+ }
+
+ private static void calcualteChunkPositionsInBuf(ECSchema ecSchema,
+ LocatedStripedBlock blockGroup, byte[] buf, int offsetInBuf,
+ int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize,
+ long firstCellOffsetInBlk, int lastCellSize, AlignedStripe[] stripes) {
+ int cellSize = ecSchema.getChunkSize();
+ int dataBlkNum = ecSchema.getNumDataUnits();
+ // Step 3: calculate each chunk's position in destination buffer
/**
- * startOffsetInBlock
- * |
- * v
- * |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->|
+ * | <--------------- AlignedStripe --------------->|
+ *
+ * |<- length_0 ->|<-- length_1 -->|<- length_2 ->|
* +------------------+------------------+----------------+
- * | cell_0 | cell_3 | cell_6 | <- blk_0
+ * | cell_0_0_0 | cell_3_1_0 | cell_6_2_0 | <- blk_0
* +------------------+------------------+----------------+
* _/ \_______________________
* | |
- * v offsetsInBuf[0] v offsetsInBuf[1]
- * +------------------------------------------------------+
- * | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf
- * | (partial) | (from blk_1 and blk_2) | |
- * +------------------------------------------------------+
+ * v offset_0 v offset_1
+ * +----------------------------------------------------------+
+ * | cell_0_0_0 | cell_1_0_1 and cell_2_0_2 |cell_3_1_0 ...| <- buf
+ * | (partial) | (from blk_1 and blk_2) | |
+ * +----------------------------------------------------------+
+ *
+ * Cell indexing convention defined in {@link StripingCell}
*/
+ int done = 0;
+ for (int i = firstCellIdxInBG; i <= lastCellIdxInBG; i++) {
+ StripingCell cell = new StripingCell(ecSchema, i);
+ long cellStart = i == firstCellIdxInBG ?
+ firstCellOffsetInBlk : cell.idxInInternalBlk * cellSize;
+ int cellLen;
+ if (i == firstCellIdxInBG) {
+ cellLen = firstCellSize;
+ } else if (i == lastCellIdxInBG) {
+ cellLen = lastCellSize;
+ } else {
+ cellLen = cellSize;
+ }
+ long cellEnd = cellStart + cellLen - 1;
+ for (AlignedStripe s : stripes) {
+ long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
+ long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
+ long overlapEnd = Math.min(cellEnd, stripeEnd);
+ int overLapLen = (int) (overlapEnd - overlapStart + 1);
+ if (overLapLen <= 0) {
+ continue;
+ }
+ if (s.chunks[cell.idxInStripe] == null) {
+ s.chunks[cell.idxInStripe] = new StripingChunk(buf);
+ }
+
+ s.chunks[cell.idxInStripe].offsetsInBuf.
+ add((int)(offsetInBuf + done + overlapStart - cellStart));
+ s.chunks[cell.idxInStripe].lengthsInBuf.add(overLapLen);
+ }
+ done += cellLen;
+ }
+ }
+
+ private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup,
+ byte[] buf, AlignedStripe[] stripes, int cellSize, int dataBlkNum) {
+ for (AlignedStripe s : stripes) {
+ for (int i = 0; i < dataBlkNum; i++) {
+ long internalBlkLen = getInternalBlockLength(blockGroup.getBlockSize(),
+ cellSize, dataBlkNum, i);
+ if (internalBlkLen <= s.getOffsetInBlock()) {
+ Preconditions.checkState(s.chunks[i] == null);
+ s.chunks[i] = new StripingChunk(buf);
+ s.chunks[i].state = StripingChunk.ALLZERO;
+ }
+ }
+ }
+ }
+
+ /**
+ * This class represents the portion of I/O associated with each block in the
+ * striped block group.
+ * TODO: consolidate ReadPortion with AlignedStripe
+ */
+ public static class ReadPortion {
private long startOffsetInBlock = 0;
private int readLength = 0;
public final List<Integer> offsetsInBuf = new ArrayList<>();
@@ -350,11 +577,234 @@ public class StripedBlockUtil {
}
/**
+ * The unit of encoding used in {@link DFSStripedOutputStream}
+ * | <------- Striped Block Group -------> |
+ * blk_0 blk_1 blk_2
+ * | | |
+ * v v v
+ * +----------+ +----------+ +----------+
+ * |cell_0_0_0| |cell_1_0_1| |cell_2_0_2|
+ * +----------+ +----------+ +----------+
+ * |cell_3_1_0| |cell_4_1_1| |cell_5_1_2| <- {@link idxInBlkGroup} = 5
+ * +----------+ +----------+ +----------+ {@link idxInInternalBlk} = 1
+ * {@link idxInStripe} = 2
+ * A StripingCell is a special instance of {@link StripingChunk} whose offset
+ * and size align with the cell used when writing data.
+ * TODO: consider parity cells
+ */
+ public static class StripingCell {
+ public final ECSchema schema;
+ /** Logical order in a block group, used when doing I/O to a block group */
+ public final int idxInBlkGroup;
+ public final int idxInInternalBlk;
+ public final int idxInStripe;
+
+ public StripingCell(ECSchema ecSchema, int idxInBlkGroup) {
+ this.schema = ecSchema;
+ this.idxInBlkGroup = idxInBlkGroup;
+ this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits();
+ this.idxInStripe = idxInBlkGroup -
+ this.idxInInternalBlk * ecSchema.getNumDataUnits();
+ }
+
+ public StripingCell(ECSchema ecSchema, int idxInInternalBlk,
+ int idxInStripe) {
+ this.schema = ecSchema;
+ this.idxInInternalBlk = idxInInternalBlk;
+ this.idxInStripe = idxInStripe;
+ this.idxInBlkGroup =
+ idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe;
+ }
+ }
+
+ /**
+ * Given a requested byte range on a striped block group, an AlignedStripe
+ * represents a {@link VerticalRange} that is aligned with both the byte range
+ * and boundaries of all internal blocks. As illustrated in the diagram, any
+ * given byte range on a block group leads to 1~5 AlignedStripe's.
+ *
+ * |<-------- Striped Block Group -------->|
+ * blk_0 blk_1 blk_2 blk_3 blk_4
+ * +----+ | +----+ +----+
+ * |full| | | | | | <- AlignedStripe0:
+ * +----+ |~~~~| | |~~~~| |~~~~| 1st cell is partial
+ * |part| | | | | | | | <- AlignedStripe1: byte range
+ * +----+ +----+ +----+ | |~~~~| |~~~~| doesn't start at 1st block
+ * |full| |full| |full| | | | | |
+ * |cell| |cell| |cell| | | | | | <- AlignedStripe2 (full stripe)
+ * | | | | | | | | | | |
+ * +----+ +----+ +----+ | |~~~~| |~~~~|
+ * |full| |part| | | | | | <- AlignedStripe3: byte range
+ * |~~~~| +----+ | |~~~~| |~~~~| doesn't end at last block
+ * | | | | | | | <- AlignedStripe4:
+ * +----+ | +----+ +----+ last cell is partial
+ * |
+ * <---- data blocks ----> | <--- parity --->
+ *
+ * An AlignedStripe is the basic unit of reading from a striped block group,
+ * because within the AlignedStripe, all internal blocks can be processed in
+ * a uniform manner.
+ *
+ * The coverage of an AlignedStripe on an internal block is represented as a
+ * {@link StripingChunk}.
+ * To simplify the logic of reading a logical byte range from a block group,
+ * a StripingChunk is either completely in the requested byte range or
+ * completely outside the requested byte range.
+ */
+ public static class AlignedStripe {
+ public VerticalRange range;
+ /** status of each chunk in the stripe */
+ public final StripingChunk[] chunks;
+ public int fetchedChunksNum = 0;
+ public int missingChunksNum = 0;
+
+ public AlignedStripe(long offsetInBlock, long length, int width) {
+ Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+ this.range = new VerticalRange(offsetInBlock, length);
+ this.chunks = new StripingChunk[width];
+ }
+
+ public AlignedStripe(VerticalRange range, int width) {
+ this.range = range;
+ this.chunks = new StripingChunk[width];
+ }
+
+ public boolean include(long pos) {
+ return range.include(pos);
+ }
+
+ public long getOffsetInBlock() {
+ return range.offsetInBlock;
+ }
+
+ public long getSpanInBlock() {
+ return range.spanInBlock;
+ }
+
+ @Override
+ public String toString() {
+ return "Offset=" + range.offsetInBlock + ", length=" + range.spanInBlock +
+ ", fetchedChunksNum=" + fetchedChunksNum +
+ ", missingChunksNum=" + missingChunksNum;
+ }
+ }
+
+ /**
+ * A simple utility class representing an arbitrary vertical inclusive range
+ * starting at {@link offsetInBlock} and lasting for {@link length} bytes in
+ * an internal block. Note that VerticalRange doesn't necessarily align with
+ * {@link StripingCell}.
+ *
+ * |<- Striped Block Group ->|
+ * blk_0
+ * |
+ * v
+ * +-----+
+ * |~~~~~| <-- {@link offsetInBlock}
+ * | | ^
+ * | | |
+ * | | | {@link spanInBlock}
+ * | | v
+ * |~~~~~| ---
+ * | |
+ * +-----+
+ */
+ public static class VerticalRange {
+ /** start offset in the block group (inclusive) */
+ public long offsetInBlock;
+ /** length of the stripe range */
+ public long spanInBlock;
+
+ public VerticalRange(long offsetInBlock, long length) {
+ Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+ this.offsetInBlock = offsetInBlock;
+ this.spanInBlock = length;
+ }
+
+ /** whether a position is in the range */
+ public boolean include(long pos) {
+ return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock;
+ }
+ }
+
+ /**
+ * Indicates the coverage of an {@link AlignedStripe} on an internal block,
+ * and the state of the chunk in the context of the read request.
+ *
+ * |<---------------- Striped Block Group --------------->|
+ * blk_0 blk_1 blk_2 blk_3 blk_4
+ * +---------+ | +----+ +----+
+ * null null |REQUESTED| | |null| |null| <- AlignedStripe0
+ * +---------+ |---------| | |----| |----|
+ * null |REQUESTED| |REQUESTED| | |null| |null| <- AlignedStripe1
+ * +---------+ +---------+ +---------+ | +----+ +----+
+ * |REQUESTED| |REQUESTED| ALLZERO | |null| |null| <- AlignedStripe2
+ * +---------+ +---------+ | +----+ +----+
+ * <----------- data blocks ------------> | <--- parity --->
+ *
+ * The class also carries {@link buf}, {@link offsetsInBuf}, and
+ * {@link lengthsInBuf} to define how read task for this chunk should deliver
+ * the returned data.
+ */
+ public static class StripingChunk {
+ /** Chunk has been successfully fetched */
+ public static final int FETCHED = 0x01;
+ /** Chunk has encountered failed when being fetched */
+ public static final int MISSING = 0x02;
+ /** Chunk being fetched (fetching task is in-flight) */
+ public static final int PENDING = 0x04;
+ /**
+ * Chunk is requested either by application or for decoding, need to
+ * schedule read task
+ */
+ public static final int REQUESTED = 0X08;
+ /**
+ * Internal block is short and has no overlap with chunk. Chunk considered
+ * all-zero bytes in codec calculations.
+ */
+ public static final int ALLZERO = 0X0f;
+
+ /**
+ * If a chunk is completely in requested range, the state transition is:
+ * REQUESTED (when AlignedStripe created) -> PENDING -> {FETCHED | MISSING}
+ * If a chunk is completely outside requested range (including parity
+ * chunks), state transition is:
+ * null (AlignedStripe created) -> REQUESTED (upon failure) -> PENDING ...
+ */
+ public int state = REQUESTED;
+ public byte[] buf;
+ public List<Integer> offsetsInBuf;
+ public List<Integer> lengthsInBuf;
+
+ public StripingChunk(byte[] buf) {
+ this.buf = buf;
+ this.offsetsInBuf = new ArrayList<>();
+ this.lengthsInBuf = new ArrayList<>();
+ }
+
+ public int[] getOffsets() {
+ int[] offsets = new int[offsetsInBuf.size()];
+ for (int i = 0; i < offsets.length; i++) {
+ offsets[i] = offsetsInBuf.get(i);
+ }
+ return offsets;
+ }
+
+ public int[] getLengths() {
+ int[] lens = new int[this.lengthsInBuf.size()];
+ for (int i = 0; i < lens.length; i++) {
+ lens[i] = this.lengthsInBuf.get(i);
+ }
+ return lens;
+ }
+ }
+
+ /**
* This class represents result from a striped read request.
* If the task was successful or the internal computation failed,
* an index is also returned.
*/
- public static class StripedReadResult {
+ public static class StripingChunkReadResult {
public static final int SUCCESSFUL = 0x01;
public static final int FAILED = 0x02;
public static final int TIMEOUT = 0x04;
@@ -363,18 +813,23 @@ public class StripedBlockUtil {
public final int index;
public final int state;
- public StripedReadResult(int state) {
+ public StripingChunkReadResult(int state) {
Preconditions.checkArgument(state == TIMEOUT,
"Only timeout result should return negative index.");
this.index = -1;
this.state = state;
}
- public StripedReadResult(int index, int state) {
+ public StripingChunkReadResult(int index, int state) {
Preconditions.checkArgument(state != TIMEOUT,
"Timeout result should return negative index.");
this.index = index;
this.state = state;
}
+
+ @Override
+ public String toString() {
+ return "(index=" + index + ", state =" + state + ")";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a265c211/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 3f79933..452cc2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -133,8 +134,102 @@ public class TestDFSStripedInputStream {
byte[] readBuffer = new byte[readSize];
int ret = in.read(0, readBuffer, 0, readSize);
+ byte[] expected = new byte[readSize];
+ /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+ for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
+ for (int k = 0; k < CELLSIZE; k++) {
+ int posInBlk = i * CELLSIZE + k;
+ int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+ expected[posInFile] = SimulatedFSDataset.simulatedByte(
+ new Block(bg.getBlock().getBlockId() + j), posInBlk);
+ }
+ }
+ }
+
assertEquals(readSize, ret);
- // TODO: verify read results with patterned data from HDFS-8117
+ assertArrayEquals(expected, readBuffer);
+ }
+
+ @Test
+ public void testPreadWithDNFailure() throws Exception {
+ final int numBlocks = 4;
+ final int failedDNIdx = 2;
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ NUM_STRIPE_PER_BLOCK, false);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCK_GROUP_SIZE);
+
+ assert lbs.get(0) instanceof LocatedStripedBlock;
+ LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
+ for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) {
+ Block blk = new Block(bg.getBlock().getBlockId() + i,
+ NUM_STRIPE_PER_BLOCK * CELLSIZE,
+ bg.getBlock().getGenerationStamp());
+ blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+ cluster.injectBlocks(i, Arrays.asList(blk),
+ bg.getBlock().getBlockPoolId());
+ }
+ DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
+ ErasureCodingSchemaManager.getSystemDefaultSchema());
+ int readSize = BLOCK_GROUP_SIZE;
+ byte[] readBuffer = new byte[readSize];
+ byte[] expected = new byte[readSize];
+ cluster.stopDataNode(failedDNIdx);
+ /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+ for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
+ for (int k = 0; k < CELLSIZE; k++) {
+ int posInBlk = i * CELLSIZE + k;
+ int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+ expected[posInFile] = SimulatedFSDataset.simulatedByte(
+ new Block(bg.getBlock().getBlockId() + j), posInBlk);
+ }
+ }
+ }
+
+ // Update the expected content for decoded data
+ for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+ byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE];
+ int[] missingBlkIdx = new int[]{failedDNIdx, DATA_BLK_NUM+1, DATA_BLK_NUM+2};
+ byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE];
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
+ int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
+ if (j != failedDNIdx) {
+ System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE);
+ }
+ }
+ for (int k = 0; k < CELLSIZE; k++) {
+ int posInBlk = i * CELLSIZE + k;
+ decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte(
+ new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk);
+ }
+// RSRawDecoder rsRawDecoder = new RSRawDecoder();
+// rsRawDecoder.initialize(DATA_BLK_NUM, PARITY_BLK_NUM, CELLSIZE);
+// rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
+ int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
+// System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
+ //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
+ Arrays.fill(expected, posInBuf, posInBuf + CELLSIZE, (byte)7);
+ }
+ int delta = 10;
+ int done = 0;
+ // read a small delta, shouldn't trigger decode
+ // |cell_0 |
+ // |10 |
+ done += in.read(0, readBuffer, 0, delta);
+ assertEquals(delta, done);
+ // both head and trail cells are partial
+ // |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 |
+ // |256K - 10|missing|256K|256K|256K - 10|not in range|
+ done += in.read(delta, readBuffer, delta,
+ CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
+ assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
+ // read the rest
+ done += in.read(done, readBuffer, done, readSize - done);
+ assertEquals(readSize, done);
+ assertArrayEquals(expected, readBuffer);
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a265c211/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
index 5c6f449..57d6eb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -18,10 +18,13 @@
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -321,4 +324,50 @@ public class TestWriteReadStripedFile {
Assert.assertArrayEquals(bytes, result.array());
}
}
+
+ @Test
+ public void testWritePreadWithDNFailure() throws IOException {
+ final int failedDNIdx = 2;
+ final int length = cellSize * (dataBlocks + 2);
+ Path testPath = new Path("/foo");
+ final byte[] bytes = generateBytes(length);
+ DFSTestUtil.writeFile(fs, testPath, new String(bytes));
+
+ // shut down the DN that holds the last internal data block
+ BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
+ cellSize);
+ String name = (locs[0].getNames())[failedDNIdx];
+ for (DataNode dn : cluster.getDataNodes()) {
+ int port = dn.getXferPort();
+ if (name.contains(Integer.toString(port))) {
+ dn.shutdown();
+ break;
+ }
+ }
+
+ // pread
+ int startOffsetInFile = cellSize * 5;
+ try (FSDataInputStream fsdis = fs.open(testPath)) {
+ byte[] buf = new byte[length];
+ int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length);
+ Assert.assertEquals("The length of file should be the same to write size",
+ length - startOffsetInFile, readLen);
+
+ RSRawDecoder rsRawDecoder = new RSRawDecoder();
+ rsRawDecoder.initialize(dataBlocks, parityBlocks, 1);
+ byte[] expected = new byte[readLen];
+ for (int i = startOffsetInFile; i < length; i++) {
+ //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
+ if ((i / cellSize) % dataBlocks == failedDNIdx) {
+ expected[i - startOffsetInFile] = (byte)7;
+ } else {
+ expected[i - startOffsetInFile] = getByte(i);
+ }
+ }
+ for (int i = startOffsetInFile; i < length; i++) {
+ Assert.assertEquals("Byte at " + i + " should be the same",
+ expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
+ }
+ }
+ }
}