You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2016/09/08 18:54:45 UTC
hadoop git commit: HDFS-8901. Use ByteBuffer in striping positional
read. Contributed by Sammi Chen and Kai Zheng.
Repository: hadoop
Updated Branches:
refs/heads/trunk 20a20c2f6 -> 401db4fc6
HDFS-8901. Use ByteBuffer in striping positional read. Contributed by Sammi Chen and Kai Zheng.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/401db4fc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/401db4fc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/401db4fc
Branch: refs/heads/trunk
Commit: 401db4fc65140979fe7665983e36905e886df971
Parents: 20a20c2
Author: Zhe Zhang <zh...@apache.org>
Authored: Thu Sep 8 11:54:33 2016 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Thu Sep 8 11:54:33 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/util/DataChecksum.java | 2 +-
.../org/apache/hadoop/hdfs/DFSInputStream.java | 68 ++++---
.../hadoop/hdfs/DFSStripedInputStream.java | 24 ++-
.../hadoop/hdfs/util/StripedBlockUtil.java | 177 ++++++++++---------
.../hadoop/hdfs/TestDFSStripedInputStream.java | 121 ++++++++++++-
.../hadoop/hdfs/util/TestStripedBlockUtil.java | 22 +--
6 files changed, 281 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index 3a53ed9..6982a92 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -304,7 +304,7 @@ public class DataChecksum implements Checksum {
bytesPerChecksum, checksums.array(), crcsOffset, fileName, basePos);
return;
}
- if (NativeCrc32.isAvailable()) {
+ if (NativeCrc32.isAvailable() && data.isDirect()) {
NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data,
fileName, basePos);
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 7a10ba4..31fa897 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -533,7 +533,8 @@ public class DFSInputStream extends FSInputStream
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
*/
- private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
+ private synchronized DatanodeInfo blockSeekTo(long target)
+ throws IOException {
if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file");
}
@@ -962,14 +963,14 @@ public class DFSInputStream extends FSInputStream
}
protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
- byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
+ ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
block = refreshLocatedBlock(block);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
try {
actualGetFromOneDataNode(addressPair, block, start, end,
- buf, offset, corruptedBlocks);
+ buf, corruptedBlocks);
return;
} catch (IOException e) {
checkInterrupted(e); // check if the read has been interrupted
@@ -988,12 +989,10 @@ public class DFSInputStream extends FSInputStream
return new Callable<ByteBuffer>() {
@Override
public ByteBuffer call() throws Exception {
- byte[] buf = bb.array();
- int offset = bb.position();
try (TraceScope ignored = dfsClient.getTracer().
newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
- actualGetFromOneDataNode(datanode, block, start, end, buf,
- offset, corruptedBlocks);
+ actualGetFromOneDataNode(datanode, block, start, end, bb,
+ corruptedBlocks);
return bb;
}
}
@@ -1007,13 +1006,12 @@ public class DFSInputStream extends FSInputStream
* @param block the located block containing the requested data
* @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block
- * @param buf the given byte array into which the data is read
- * @param offset the offset in buf
+ * @param buf the given byte buffer into which the data is read
* @param corruptedBlocks map recording list of datanodes with corrupted
* block replica
*/
void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block,
- final long startInBlk, final long endInBlk, byte[] buf, int offset,
+ final long startInBlk, final long endInBlk, ByteBuffer buf,
CorruptedBlocks corruptedBlocks)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
@@ -1031,7 +1029,22 @@ public class DFSInputStream extends FSInputStream
DFSClientFaultInjector.get().fetchFromDatanodeException();
reader = getBlockReader(block, startInBlk, len, datanode.addr,
datanode.storageType, datanode.info);
- int nread = reader.readAll(buf, offset, len);
+
+ //Behave exactly as the readAll() call
+ ByteBuffer tmp = buf.duplicate();
+ tmp.limit(tmp.position() + len);
+ tmp = tmp.slice();
+ int nread = 0;
+ int ret;
+ while (true) {
+ ret = reader.read(tmp);
+ if (ret <= 0) {
+ break;
+ }
+ nread += ret;
+ }
+ buf.position(buf.position() + nread);
+
IOUtilsClient.updateReadStatistics(readStatistics, nread, reader);
dfsClient.updateFileSystemReadStats(
reader.getNetworkDistance(), nread);
@@ -1098,7 +1111,7 @@ public class DFSInputStream extends FSInputStream
* time. We then wait on which ever read returns first.
*/
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
- long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
+ long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
final DfsClientConf conf = dfsClient.getConf();
ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
@@ -1130,8 +1143,8 @@ public class DFSInputStream extends FSInputStream
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
if (future != null) {
ByteBuffer result = future.get();
- System.arraycopy(result.array(), result.position(), buf, offset,
- len);
+ result.flip();
+ buf.put(result);
return;
}
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
@@ -1173,8 +1186,8 @@ public class DFSInputStream extends FSInputStream
// cancel the rest.
cancelAll(futures);
dfsClient.getHedgedReadMetrics().incHedgedReadWins();
- System.arraycopy(result.array(), result.position(), buf, offset,
- len);
+ result.flip();
+ buf.put(result);
return;
} catch (InterruptedException ie) {
// Ignore and retry
@@ -1244,7 +1257,8 @@ public class DFSInputStream extends FSInputStream
* access key from its memory since it's considered expired based on
* the estimated expiration date.
*/
- if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
+ if (ex instanceof InvalidBlockTokenException ||
+ ex instanceof InvalidToken) {
DFSClient.LOG.info("Access token was invalid when connecting to "
+ targetAddr + " : " + ex);
return true;
@@ -1272,7 +1286,8 @@ public class DFSInputStream extends FSInputStream
try (TraceScope scope = dfsClient.
newReaderTraceScope("DFSInputStream#byteArrayPread",
src, position, length)) {
- int retLen = pread(position, buffer, offset, length);
+ ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
+ int retLen = pread(position, bb);
if (retLen < length) {
dfsClient.addRetLenToReaderScope(scope, retLen);
}
@@ -1280,7 +1295,7 @@ public class DFSInputStream extends FSInputStream
}
}
- private int pread(long position, byte[] buffer, int offset, int length)
+ private int pread(long position, ByteBuffer buffer)
throws IOException {
// sanity checks
dfsClient.checkOpen();
@@ -1292,6 +1307,7 @@ public class DFSInputStream extends FSInputStream
if ((position < 0) || (position >= filelen)) {
return -1;
}
+ int length = buffer.remaining();
int realLen = length;
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
@@ -1304,14 +1320,16 @@ public class DFSInputStream extends FSInputStream
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
- long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
+ int bytesToRead = (int) Math.min(remaining,
+ blk.getBlockSize() - targetStart);
+ long targetEnd = targetStart + bytesToRead - 1;
try {
if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
hedgedFetchBlockByteRange(blk, targetStart,
- targetStart + bytesToRead - 1, buffer, offset, corruptedBlocks);
+ targetEnd, buffer, corruptedBlocks);
} else {
- fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
- buffer, offset, corruptedBlocks);
+ fetchBlockByteRange(blk, targetStart, targetEnd,
+ buffer, corruptedBlocks);
}
} finally {
// Check and report if any block replicas are corrupted.
@@ -1323,7 +1341,6 @@ public class DFSInputStream extends FSInputStream
remaining -= bytesToRead;
position += bytesToRead;
- offset += bytesToRead;
}
assert remaining == 0 : "Wrong number of bytes read.";
return realLen;
@@ -1457,7 +1474,8 @@ public class DFSInputStream extends FSInputStream
* If another node could not be found, then returns false.
*/
@Override
- public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+ public synchronized boolean seekToNewSource(long targetPos)
+ throws IOException {
if (currentNode == null) {
return seekToBlockSource(targetPos);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 9ca8005..ccaf6a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -307,8 +307,8 @@ public class DFSStripedInputStream extends DFSInputStream {
stripeLimit - stripeBufOffset);
LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
- AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy, cellSize,
- blockGroup, offsetInBlockGroup,
+ AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy,
+ cellSize, blockGroup, offsetInBlockGroup,
offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
blockGroup, cellSize, dataBlkNum, parityBlkNum);
@@ -523,13 +523,13 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
@Override
protected void fetchBlockByteRange(LocatedBlock block, long start,
- long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
+ long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
// Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
- ecPolicy, cellSize, blockGroup, start, end, buf, offset);
+ ecPolicy, cellSize, blockGroup, start, end, buf);
CompletionService<Void> readService = new ExecutorCompletionService<>(
dfsClient.getStripedReadsThreadPool());
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
@@ -542,6 +542,7 @@ public class DFSStripedInputStream extends DFSInputStream {
blks, preaderInfos, corruptedBlocks);
preader.readStripe();
}
+ buf.position(buf.position() + (int)(end - start + 1));
} finally {
for (BlockReaderInfo preaderInfo : preaderInfos) {
closeReader(preaderInfo);
@@ -698,16 +699,15 @@ public class DFSStripedInputStream extends DFSInputStream {
}
private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
- if (chunk.byteBuffer != null) {
- ByteBufferStrategy strategy =
- new ByteBufferStrategy(chunk.byteBuffer, readStatistics, dfsClient);
+ if (chunk.useByteBuffer()) {
+ ByteBufferStrategy strategy = new ByteBufferStrategy(
+ chunk.getByteBuffer(), readStatistics, dfsClient);
return new ByteBufferStrategy[]{strategy};
} else {
ByteBufferStrategy[] strategies =
- new ByteBufferStrategy[chunk.byteArray.getOffsets().length];
+ new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
for (int i = 0; i < strategies.length; i++) {
- ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(),
- chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]);
+ ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
strategies[i] =
new ByteBufferStrategy(buffer, readStatistics, dfsClient);
}
@@ -814,7 +814,7 @@ public class DFSStripedInputStream extends DFSInputStream {
}
class PositionStripeReader extends StripeReader {
- private byte[][] decodeInputs = null;
+ private ByteBuffer[] decodeInputs = null;
PositionStripeReader(CompletionService<Void> service,
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
@@ -836,8 +836,6 @@ public class DFSStripedInputStream extends DFSInputStream {
Preconditions.checkState(index >= dataBlkNum &&
alignedStripe.chunks[index] == null);
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
- alignedStripe.chunks[index].addByteArraySlice(0,
- (int) alignedStripe.getSpanInBlock());
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index c8827d9..4dbbc3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -73,7 +73,8 @@ import java.util.concurrent.TimeUnit;
@InterfaceAudience.Private
public class StripedBlockUtil {
- public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class);
+ public static final Logger LOG =
+ LoggerFactory.getLogger(StripedBlockUtil.class);
/**
* Parses a striped block group into individual blocks.
@@ -312,16 +313,17 @@ public class StripedBlockUtil {
* schedule a new fetch request with the decoding input buffer as transfer
* destination.
*/
- public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
+ public static ByteBuffer[] initDecodeInputs(AlignedStripe alignedStripe,
int dataBlkNum, int parityBlkNum) {
- byte[][] decodeInputs =
- new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()];
+ ByteBuffer[] decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
+ for (int i = 0; i < decodeInputs.length; i++) {
+ decodeInputs[i] = ByteBuffer.allocate(
+ (int) alignedStripe.getSpanInBlock());
+ }
// read the full data aligned stripe
for (int i = 0; i < dataBlkNum; i++) {
if (alignedStripe.chunks[i] == null) {
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
- alignedStripe.chunks[i].addByteArraySlice(0,
- (int) alignedStripe.getSpanInBlock());
}
}
return decodeInputs;
@@ -334,14 +336,21 @@ public class StripedBlockUtil {
* When all pending requests have returned, this method should be called to
* finalize decode input buffers.
*/
- public static void finalizeDecodeInputs(final byte[][] decodeInputs,
+ public static void finalizeDecodeInputs(final ByteBuffer[] decodeInputs,
AlignedStripe alignedStripe) {
for (int i = 0; i < alignedStripe.chunks.length; i++) {
final StripingChunk chunk = alignedStripe.chunks[i];
if (chunk != null && chunk.state == StripingChunk.FETCHED) {
- chunk.copyTo(decodeInputs[i]);
+ if (chunk.useChunkBuffer()) {
+ chunk.getChunkBuffer().copyTo(decodeInputs[i]);
+ } else {
+ chunk.getByteBuffer().flip();
+ }
} else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
- Arrays.fill(decodeInputs[i], (byte) 0);
+ //ZERO it. Will be better handled in other following issue.
+ byte[] emptyBytes = new byte[decodeInputs[i].limit()];
+ decodeInputs[i].put(emptyBytes);
+ decodeInputs[i].flip();
} else {
decodeInputs[i] = null;
}
@@ -351,7 +360,7 @@ public class StripedBlockUtil {
/**
* Decode based on the given input buffers and erasure coding policy.
*/
- public static void decodeAndFillBuffer(final byte[][] decodeInputs,
+ public static void decodeAndFillBuffer(final ByteBuffer[] decodeInputs,
AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
RawErasureDecoder decoder) {
// Step 1: prepare indices and output buffers for missing data units
@@ -364,8 +373,11 @@ public class StripedBlockUtil {
}
}
decodeIndices = Arrays.copyOf(decodeIndices, pos);
- byte[][] decodeOutputs =
- new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()];
+ ByteBuffer[] decodeOutputs = new ByteBuffer[decodeIndices.length];
+ for (int i = 0; i < decodeOutputs.length; i++) {
+ decodeOutputs[i] = ByteBuffer.allocate(
+ (int) alignedStripe.getSpanInBlock());
+ }
// Step 2: decode into prepared output buffers
decoder.decode(decodeInputs, decodeIndices, decodeOutputs);
@@ -374,8 +386,8 @@ public class StripedBlockUtil {
for (int i = 0; i < decodeIndices.length; i++) {
int missingBlkIdx = decodeIndices[i];
StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
- if (chunk.state == StripingChunk.MISSING) {
- chunk.copyFrom(decodeOutputs[i]);
+ if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) {
+ chunk.getChunkBuffer().copyFrom(decodeOutputs[i]);
}
}
}
@@ -402,7 +414,8 @@ public class StripedBlockUtil {
// Step 4: calculate each chunk's position in destination buffer. Since the
// whole read range is within a single stripe, the logic is simpler here.
- int bufOffset = (int) (rangeStartInBlockGroup % ((long) cellSize * dataBlkNum));
+ int bufOffset =
+ (int) (rangeStartInBlockGroup % ((long) cellSize * dataBlkNum));
for (StripingCell cell : cells) {
long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
long cellEnd = cellStart + cell.size - 1;
@@ -437,15 +450,14 @@ public class StripedBlockUtil {
* @param rangeStartInBlockGroup The byte range's start offset in block group
* @param rangeEndInBlockGroup The byte range's end offset in block group
* @param buf Destination buffer of the read operation for the byte range
- * @param offsetInBuf Start offset into the destination buffer
*
* At most 5 stripes will be generated from each logical range, as
* demonstrated in the header of {@link AlignedStripe}.
*/
- public static AlignedStripe[] divideByteRangeIntoStripes(ErasureCodingPolicy ecPolicy,
+ public static AlignedStripe[] divideByteRangeIntoStripes(
+ ErasureCodingPolicy ecPolicy,
int cellSize, LocatedStripedBlock blockGroup,
- long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
- int offsetInBuf) {
+ long rangeStartInBlockGroup, long rangeEndInBlockGroup, ByteBuffer buf) {
// Step 0: analyze range and calculate basic parameters
final int dataBlkNum = ecPolicy.getNumDataUnits();
@@ -462,7 +474,7 @@ public class StripedBlockUtil {
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
// Step 4: calculate each chunk's position in destination buffer
- calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf);
+ calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
// Step 5: prepare ALLZERO blocks
prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum);
@@ -476,7 +488,8 @@ public class StripedBlockUtil {
* used by {@link DFSStripedOutputStream} in encoding
*/
@VisibleForTesting
- private static StripingCell[] getStripingCellsOfByteRange(ErasureCodingPolicy ecPolicy,
+ private static StripingCell[] getStripingCellsOfByteRange(
+ ErasureCodingPolicy ecPolicy,
int cellSize, LocatedStripedBlock blockGroup,
long rangeStartInBlockGroup, long rangeEndInBlockGroup) {
Preconditions.checkArgument(
@@ -511,7 +524,8 @@ public class StripedBlockUtil {
* the physical byte range (inclusive) on each stored internal block.
*/
@VisibleForTesting
- private static VerticalRange[] getRangesForInternalBlocks(ErasureCodingPolicy ecPolicy,
+ private static VerticalRange[] getRangesForInternalBlocks(
+ ErasureCodingPolicy ecPolicy,
int cellSize, StripingCell[] cells) {
int dataBlkNum = ecPolicy.getNumDataUnits();
int parityBlkNum = ecPolicy.getNumParityUnits();
@@ -575,8 +589,7 @@ public class StripedBlockUtil {
}
private static void calcualteChunkPositionsInBuf(int cellSize,
- AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
- int offsetInBuf) {
+ AlignedStripe[] stripes, StripingCell[] cells, ByteBuffer buf) {
/**
* | <--------------- AlignedStripe --------------->|
*
@@ -598,6 +611,7 @@ public class StripedBlockUtil {
for (StripingCell cell : cells) {
long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
long cellEnd = cellStart + cell.size - 1;
+ StripingChunk chunk;
for (AlignedStripe s : stripes) {
long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
@@ -606,11 +620,13 @@ public class StripedBlockUtil {
if (overLapLen <= 0) {
continue;
}
- if (s.chunks[cell.idxInStripe] == null) {
- s.chunks[cell.idxInStripe] = new StripingChunk(buf);
+ chunk = s.chunks[cell.idxInStripe];
+ if (chunk == null) {
+ chunk = new StripingChunk();
+ s.chunks[cell.idxInStripe] = chunk;
}
- s.chunks[cell.idxInStripe].addByteArraySlice(
- (int)(offsetInBuf + done + overlapStart - cellStart), overLapLen);
+ chunk.getChunkBuffer().addSlice(buf,
+ (int) (done + overlapStart - cellStart), overLapLen);
}
done += cell.size;
}
@@ -833,88 +849,89 @@ public class StripedBlockUtil {
*/
public int state = REQUESTED;
- public final ChunkByteArray byteArray;
- public final ByteBuffer byteBuffer;
+ private final ChunkByteBuffer chunkBuffer;
+ private final ByteBuffer byteBuffer;
- public StripingChunk(byte[] buf) {
- this.byteArray = new ChunkByteArray(buf);
+ public StripingChunk() {
+ this.chunkBuffer = new ChunkByteBuffer();
byteBuffer = null;
}
public StripingChunk(ByteBuffer buf) {
- this.byteArray = null;
+ this.chunkBuffer = null;
this.byteBuffer = buf;
}
public StripingChunk(int state) {
- this.byteArray = null;
+ this.chunkBuffer = null;
this.byteBuffer = null;
this.state = state;
}
- public void addByteArraySlice(int offset, int length) {
- assert byteArray != null;
- byteArray.offsetsInBuf.add(offset);
- byteArray.lengthsInBuf.add(length);
+ public boolean useByteBuffer(){
+ return byteBuffer != null;
}
- void copyTo(byte[] target) {
- assert byteArray != null;
- byteArray.copyTo(target);
+ public boolean useChunkBuffer() {
+ return chunkBuffer != null;
}
- void copyFrom(byte[] src) {
- assert byteArray != null;
- byteArray.copyFrom(src);
+ public ByteBuffer getByteBuffer() {
+ assert byteBuffer != null;
+ return byteBuffer;
+ }
+
+ public ChunkByteBuffer getChunkBuffer() {
+ assert chunkBuffer != null;
+ return chunkBuffer;
}
}
- public static class ChunkByteArray {
- private final byte[] buf;
- private final List<Integer> offsetsInBuf;
- private final List<Integer> lengthsInBuf;
+ /**
+ * A utility to manage ByteBuffer slices for a reader.
+ */
+ public static class ChunkByteBuffer {
+ private final List<ByteBuffer> slices;
- ChunkByteArray(byte[] buf) {
- this.buf = buf;
- this.offsetsInBuf = new ArrayList<>();
- this.lengthsInBuf = new ArrayList<>();
+ ChunkByteBuffer() {
+ this.slices = new ArrayList<>();
}
- public int[] getOffsets() {
- int[] offsets = new int[offsetsInBuf.size()];
- for (int i = 0; i < offsets.length; i++) {
- offsets[i] = offsetsInBuf.get(i);
- }
- return offsets;
+ public void addSlice(ByteBuffer buffer, int offset, int len) {
+ ByteBuffer tmp = buffer.duplicate();
+ tmp.position(buffer.position() + offset);
+ tmp.limit(buffer.position() + offset + len);
+ slices.add(tmp.slice());
}
- public int[] getLengths() {
- int[] lens = new int[this.lengthsInBuf.size()];
- for (int i = 0; i < lens.length; i++) {
- lens[i] = this.lengthsInBuf.get(i);
- }
- return lens;
+ public ByteBuffer getSlice(int i) {
+ return slices.get(i);
}
- public byte[] buf() {
- return buf;
+ public List<ByteBuffer> getSlices() {
+ return slices;
}
- void copyTo(byte[] target) {
- int posInBuf = 0;
- for (int i = 0; i < offsetsInBuf.size(); i++) {
- System.arraycopy(buf, offsetsInBuf.get(i),
- target, posInBuf, lengthsInBuf.get(i));
- posInBuf += lengthsInBuf.get(i);
+ /**
+ * Note: target will be ready-to-read state after the call.
+ */
+ void copyTo(ByteBuffer target) {
+ for (ByteBuffer slice : slices) {
+ slice.flip();
+ target.put(slice);
}
- }
-
- void copyFrom(byte[] src) {
- int srcPos = 0;
- for (int j = 0; j < offsetsInBuf.size(); j++) {
- System.arraycopy(src, srcPos, buf, offsetsInBuf.get(j),
- lengthsInBuf.get(j));
- srcPos += lengthsInBuf.get(j);
+ target.flip();
+ }
+
+ void copyFrom(ByteBuffer src) {
+ ByteBuffer tmp;
+ int len;
+ for (ByteBuffer slice : slices) {
+ len = slice.remaining();
+ tmp = src.duplicate();
+ tmp.limit(tmp.position() + len);
+ slice.put(tmp);
+ src.position(src.position() + len);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 18c2de9..1e27745 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -57,7 +57,8 @@ import static org.junit.Assert.assertTrue;
public class TestDFSStripedInputStream {
- public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class);
+ public static final Log LOG =
+ LogFactory.getLog(TestDFSStripedInputStream.class);
private MiniDFSCluster cluster;
private Configuration conf = new Configuration();
@@ -272,12 +273,16 @@ public class TestDFSStripedInputStream {
// |10 |
done += in.read(0, readBuffer, 0, delta);
assertEquals(delta, done);
+ assertArrayEquals(Arrays.copyOf(expected, done),
+ Arrays.copyOf(readBuffer, done));
// both head and trail cells are partial
// |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 |
// |256K - 10|missing|256K|256K|256K - 10|not in range|
done += in.read(delta, readBuffer, delta,
CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
+ assertArrayEquals(Arrays.copyOf(expected, done),
+ Arrays.copyOf(readBuffer, done));
// read the rest
done += in.read(done, readBuffer, done, readSize - done);
assertEquals(readSize, done);
@@ -291,8 +296,8 @@ public class TestDFSStripedInputStream {
testStatefulRead(true, true);
}
- private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
- throws Exception {
+ private void testStatefulRead(boolean useByteBuffer,
+ boolean cellMisalignPacket) throws Exception {
final int numBlocks = 2;
final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
if (cellMisalignPacket) {
@@ -302,7 +307,8 @@ public class TestDFSStripedInputStream {
}
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
- LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(filePath.toString(), 0, fileSize);
+ LocatedBlocks lbs = fs.getClient().namenode.
+ getBlockLocations(filePath.toString(), 0, fileSize);
assert lbs.getLocatedBlocks().size() == numBlocks;
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
@@ -360,4 +366,111 @@ public class TestDFSStripedInputStream {
}
fs.delete(filePath, true);
}
+
+ @Test
+ public void testStatefulReadWithDNFailure() throws Exception {
+ final int numBlocks = 4;
+ final int failedDNIdx = DATA_BLK_NUM - 1;
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ NUM_STRIPE_PER_BLOCK, false);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCK_GROUP_SIZE);
+
+ assert lbs.get(0) instanceof LocatedStripedBlock;
+ LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+ for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) {
+ Block blk = new Block(bg.getBlock().getBlockId() + i,
+ NUM_STRIPE_PER_BLOCK * CELLSIZE,
+ bg.getBlock().getGenerationStamp());
+ blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+ cluster.injectBlocks(i, Arrays.asList(blk),
+ bg.getBlock().getBlockPoolId());
+ }
+ DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
+ ecPolicy, null);
+ int readSize = BLOCK_GROUP_SIZE;
+ byte[] readBuffer = new byte[readSize];
+ byte[] expected = new byte[readSize];
+ /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+ for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
+ for (int k = 0; k < CELLSIZE; k++) {
+ int posInBlk = i * CELLSIZE + k;
+ int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+ expected[posInFile] = SimulatedFSDataset.simulatedByte(
+ new Block(bg.getBlock().getBlockId() + j), posInBlk);
+ }
+ }
+ }
+
+ ErasureCoderOptions coderOptions = new ErasureCoderOptions(
+ DATA_BLK_NUM, PARITY_BLK_NUM);
+ RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(conf,
+ ecPolicy.getCodecName(), coderOptions);
+
+ // Update the expected content for decoded data
+ int[] missingBlkIdx = new int[PARITY_BLK_NUM];
+ for (int i = 0; i < missingBlkIdx.length; i++) {
+ if (i == 0) {
+ missingBlkIdx[i] = failedDNIdx;
+ } else {
+ missingBlkIdx[i] = DATA_BLK_NUM + i;
+ }
+ }
+ cluster.stopDataNode(failedDNIdx);
+ for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+ byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE];
+ byte[][] decodeOutputs = new byte[missingBlkIdx.length][CELLSIZE];
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
+ int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
+ if (j != failedDNIdx) {
+ System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE);
+ }
+ }
+ for (int j = DATA_BLK_NUM; j < DATA_BLK_NUM + PARITY_BLK_NUM; j++) {
+ for (int k = 0; k < CELLSIZE; k++) {
+ int posInBlk = i * CELLSIZE + k;
+ decodeInputs[j][k] = SimulatedFSDataset.simulatedByte(
+ new Block(bg.getBlock().getBlockId() + j), posInBlk);
+ }
+ }
+ for (int m : missingBlkIdx) {
+ decodeInputs[m] = null;
+ }
+ rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
+ int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
+ System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
+ }
+
+ int delta = 10;
+ int done = 0;
+ // read a small delta, shouldn't trigger decode
+ // |cell_0 |
+ // |10 |
+ done += in.read(readBuffer, 0, delta);
+ assertEquals(delta, done);
+ // both head and trail cells are partial
+ // |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 |
+ // |256K - 10|missing|256K|256K|256K - 10|not in range|
+ while (done < (CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta)) {
+ int ret = in.read(readBuffer, delta,
+ CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
+ assertTrue(ret > 0);
+ done += ret;
+ }
+ assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
+ // read the rest
+
+ int restSize;
+ restSize = readSize - done;
+ while (done < restSize) {
+ int ret = in.read(readBuffer, done, restSize);
+ assertTrue(ret > 0);
+ done += ret;
+ }
+
+ assertEquals(readSize, done);
+ assertArrayEquals(expected, readBuffer);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/401db4fc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
index 96fc79c..7d9d7dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
@@ -36,6 +36,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import java.nio.ByteBuffer;
import java.util.Random;
import static org.junit.Assert.assertEquals;
@@ -242,7 +243,8 @@ public class TestStripedBlockUtil {
*/
@Test
public void testDivideByteRangeIntoStripes() {
- byte[] assembled = new byte[BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE];
+ ByteBuffer assembled =
+ ByteBuffer.allocate(BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE);
for (int bgSize : blockGroupSizes) {
LocatedStripedBlock blockGroup = createDummyLocatedBlock(bgSize);
byte[][] internalBlkBufs = createInternalBlkBuffers(bgSize);
@@ -252,7 +254,7 @@ public class TestStripedBlockUtil {
continue;
}
AlignedStripe[] stripes = divideByteRangeIntoStripes(EC_POLICY,
- CELLSIZE, blockGroup, brStart, brStart + brSize - 1, assembled, 0);
+ CELLSIZE, blockGroup, brStart, brStart + brSize - 1, assembled);
for (AlignedStripe stripe : stripes) {
for (int i = 0; i < DATA_BLK_NUM; i++) {
@@ -261,21 +263,21 @@ public class TestStripedBlockUtil {
continue;
}
int done = 0;
- for (int j = 0; j < chunk.byteArray.getLengths().length; j++) {
- System.arraycopy(internalBlkBufs[i],
- (int) stripe.getOffsetInBlock() + done, assembled,
- chunk.byteArray.getOffsets()[j],
- chunk.byteArray.getLengths()[j]);
- done += chunk.byteArray.getLengths()[j];
+ int len;
+ for (ByteBuffer slice : chunk.getChunkBuffer().getSlices()) {
+ len = slice.remaining();
+ slice.put(internalBlkBufs[i],
+ (int) stripe.getOffsetInBlock() + done, len);
+ done += len;
}
}
}
for (int i = 0; i < brSize; i++) {
- if (hashIntToByte(brStart + i) != assembled[i]) {
+ if (hashIntToByte(brStart + i) != assembled.get(i)) {
System.out.println("Oops");
}
assertEquals("Byte at " + (brStart + i) + " should be the same",
- hashIntToByte(brStart + i), assembled[i]);
+ hashIntToByte(brStart + i), assembled.get(i));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org