You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/17 01:58:50 UTC
[22/50] hadoop git commit: HDFS-8281. Erasure Coding: implement
parallel stateful reading for striped layout. Contributed by Jing Zhao.
HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67a22d24
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67a22d24
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67a22d24
Branch: refs/heads/HDFS-7285
Commit: 67a22d24cfec08602344766b5eb0d15ef82bf79d
Parents: 4bae0b1
Author: Jing Zhao <ji...@apache.org>
Authored: Mon May 4 14:44:58 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Sat May 16 15:16:04 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../org/apache/hadoop/hdfs/DFSInputStream.java | 26 +++
.../hadoop/hdfs/DFSStripedInputStream.java | 217 +++++++++++++------
.../hadoop/hdfs/util/StripedBlockUtil.java | 34 ++-
.../hadoop/hdfs/TestDFSStripedInputStream.java | 50 ++++-
.../hadoop/hdfs/TestPlanReadPortions.java | 4 +-
6 files changed, 246 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index e30b2ed..77272e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -161,3 +161,6 @@
HDFS-8316. Erasure coding: refactor EC constants to be consistent with HDFS-8249.
(Zhe Zhang via jing9)
+
+ HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout.
+ (jing9)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index bef4da0..ca799fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -716,6 +716,16 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
interface ReaderStrategy {
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException;
+
+ /**
+ * Copy data from the src ByteBuffer into the read buffer.
+ * @param src The src buffer where the data is copied from
+ * @param offset Useful only when the ReadStrategy is based on a byte array.
+ * Indicate the offset of the byte array for copy.
+ * @param length Useful only when the ReadStrategy is based on a byte array.
+ * Indicate the length of the data to copy.
+ */
+ public int copyFrom(ByteBuffer src, int offset, int length);
}
protected void updateReadStatistics(ReadStatistics readStatistics,
@@ -749,6 +759,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
updateReadStatistics(readStatistics, nRead, blockReader);
return nRead;
}
+
+ @Override
+ public int copyFrom(ByteBuffer src, int offset, int length) {
+ ByteBuffer writeSlice = src.duplicate();
+ writeSlice.get(buf, offset, length);
+ return length;
+ }
}
/**
@@ -782,6 +799,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
}
+
+ @Override
+ public int copyFrom(ByteBuffer src, int offset, int length) {
+ ByteBuffer writeSlice = src.duplicate();
+ int remaining = Math.min(buf.remaining(), writeSlice.remaining());
+ writeSlice.limit(writeSlice.position() + remaining);
+ buf.put(writeSlice);
+ return remaining;
+ }
}
/* This is a used by regular read() and handles ChecksumExceptions.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 0dc98fd..13c4743 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
@@ -37,6 +38,7 @@ import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CancellationException;
@@ -62,7 +64,7 @@ import java.util.concurrent.Future;
* +------+ <- A cell contains {@link #cellSize} bytes of data
*
* Three styles of read will eventually be supported:
- * 1. Stateful read: TODO: HDFS-8033
+ * 1. Stateful read
* 2. pread without decode support
* This is implemented by calculating the portion of read from each block and
* issuing requests to each DataNode in parallel.
@@ -91,12 +93,38 @@ public class DFSStripedInputStream extends DFSInputStream {
}
}
+ /** Used to indicate the buffered data's range in the block group */
+ private static class StripeRange {
+ /** start offset in the block group (inclusive) */
+ final long offsetInBlock;
+ /** length of the stripe range */
+ final long length;
+
+ StripeRange(long offsetInBlock, long length) {
+ Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+ this.offsetInBlock = offsetInBlock;
+ this.length = length;
+ }
+
+ boolean include(long pos) {
+ return pos >= offsetInBlock && pos < offsetInBlock + length;
+ }
+ }
+
private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
private final BlockReader[] blockReaders = new BlockReader[groupSize];
private final DatanodeInfo[] currentNodes = new DatanodeInfo[groupSize];
private final int cellSize;
private final short dataBlkNum;
private final short parityBlkNum;
+ /** the buffer for a complete stripe */
+ private ByteBuffer curStripeBuf;
+ /**
+ * indicate the start/end offset of the current buffered stripe in the
+ * block group
+ */
+ private StripeRange curStripeRange;
+ private final CompletionService<Integer> readingService;
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
ECInfo ecInfo) throws IOException {
@@ -106,7 +134,20 @@ public class DFSStripedInputStream extends DFSInputStream {
cellSize = ecInfo.getSchema().getChunkSize();
dataBlkNum = (short) ecInfo.getSchema().getNumDataUnits();
parityBlkNum = (short) ecInfo.getSchema().getNumParityUnits();
- DFSClient.LOG.debug("Creating an striped input stream for file " + src);
+ curStripeRange = new StripeRange(0, 0);
+ readingService =
+ new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Creating an striped input stream for file " + src);
+ }
+ }
+
+ private void resetCurStripeBuffer() {
+ if (curStripeBuf == null) {
+ curStripeBuf = ByteBuffer.allocateDirect(cellSize * dataBlkNum);
+ }
+ curStripeBuf.clear();
+ curStripeRange = new StripeRange(0, 0);
}
@Override
@@ -141,7 +182,7 @@ public class DFSStripedInputStream extends DFSInputStream {
targetBlockGroup.getBlockSize() - 1;
currentLocatedBlock = targetBlockGroup;
- long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset();
+ final long offsetIntoBlockGroup = getOffsetInBlockGroup();
LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
// The purpose is to get start offset into each block
@@ -156,8 +197,8 @@ public class DFSStripedInputStream extends DFSInputStream {
if (retval != null) {
currentNodes[i] = retval.info;
blockReaders[i] = getBlockReaderWithRetry(targetBlock,
- readPortions[i].startOffsetInBlock,
- targetBlock.getBlockSize() - readPortions[i].startOffsetInBlock,
+ readPortions[i].getStartOffsetInBlock(),
+ targetBlock.getBlockSize() - readPortions[i].getStartOffsetInBlock(),
retval.addr, retval.storageType, retval.info, target, retry);
}
}
@@ -203,6 +244,7 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
@Override
protected void closeCurrentBlockReaders() {
+ resetCurStripeBuffer();
if (blockReaders == null || blockReaders.length == 0) {
return;
}
@@ -220,6 +262,73 @@ public class DFSStripedInputStream extends DFSInputStream {
blockEnd = -1;
}
+ private long getOffsetInBlockGroup() {
+ return pos - currentLocatedBlock.getStartOffset();
+ }
+
+ /**
+ * Read a new stripe covering the current position, and store the data in the
+ * {@link #curStripeBuf}.
+ */
+ private void readOneStripe(
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
+ resetCurStripeBuffer();
+
+ // compute stripe range based on pos
+ final long offsetInBlockGroup = getOffsetInBlockGroup();
+ final long stripeLen = cellSize * dataBlkNum;
+ int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
+ curStripeRange = new StripeRange(stripeIndex * stripeLen,
+ Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen),
+ stripeLen));
+ final int numCell = (int) ((curStripeRange.length - 1) / cellSize + 1);
+
+ // read the whole stripe in parallel
+ Map<Future<Integer>, Integer> futures = new HashMap<>();
+ for (int i = 0; i < numCell; i++) {
+ curStripeBuf.position(cellSize * i);
+ curStripeBuf.limit((int) Math.min(cellSize * (i + 1),
+ curStripeRange.length));
+ ByteBuffer buf = curStripeBuf.slice();
+ ByteBufferStrategy strategy = new ByteBufferStrategy(buf);
+ final int targetLength = buf.remaining();
+ Callable<Integer> readCallable = readCell(blockReaders[i],
+ currentNodes[i], strategy, targetLength, corruptedBlockMap);
+ Future<Integer> request = readingService.submit(readCallable);
+ futures.put(request, i);
+ }
+ while (!futures.isEmpty()) {
+ try {
+ waitNextCompletion(readingService, futures);
+ // TODO: decode and record bad reader if necessary
+ } catch (InterruptedException ignored) {
+ // ignore and retry
+ }
+ }
+ }
+
+ private Callable<Integer> readCell(final BlockReader reader,
+ final DatanodeInfo datanode, final ByteBufferStrategy strategy,
+ final int targetLength,
+ final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+ return new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ int result = 0;
+ while (result < targetLength) {
+ int ret = readBuffer(reader, datanode, strategy, corruptedBlockMap);
+ if (ret < 0) {
+ throw new IOException("Unexpected EOS from the reader");
+ }
+ result += ret;
+ }
+ updateReadStatistics(readStatistics, targetLength, reader);
+ return result;
+ }
+ };
+ }
+
@Override
protected synchronized int readWithStrategy(ReaderStrategy strategy,
int off, int len) throws IOException {
@@ -227,11 +336,10 @@ public class DFSStripedInputStream extends DFSInputStream {
if (closed.get()) {
throw new IOException("Stream closed");
}
- Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
+ new ConcurrentHashMap<>();
failures = 0;
if (pos < getFileLength()) {
- /** Index of the target block in a stripe to read from */
- int idxInGroup = (int) ((pos / cellSize) % dataBlkNum);
try {
if (pos > blockEnd) {
blockSeekTo(pos);
@@ -247,40 +355,13 @@ public class DFSStripedInputStream extends DFSInputStream {
/** Number of bytes already read into buffer */
int result = 0;
while (result < realLen) {
- /**
- * Temporary position into the file; {@link pos} might not proceed
- * to this temporary position in case of exceptions.
- */
- long tmpPos = pos + result;
- /** Start and end offsets of a cell in the file */
- long cellStart = (tmpPos / cellSize) * cellSize;
- long cellEnd = cellStart + cellSize - 1;
-
- /** Number of bytes to read from the current cell */
- int realLenInCell = (int) Math.min(realLen - result,
- cellEnd - tmpPos + 1L);
- assert realLenInCell > 0 : "Temporary position shouldn't be "
- + "after cellEnd";
-
- // Read from one blockReader up to cell boundary
- int cellRet = readBuffer(blockReaders[idxInGroup],
- currentNodes[idxInGroup], strategy, off + result, realLenInCell,
- corruptedBlockMap);
- if (cellRet >= 0) {
- result += cellRet;
- if (cellRet < realLenInCell) {
- // A short read indicates the current blockReader buffer is
- // already drained. Should return the read call. Otherwise
- // should proceed to the next cell.
- break;
- }
- } else {
- // got a EOS from reader though we expect more data on it.
- throw new IOException("Unexpected EOS from the reader");
+ if (!curStripeRange.include(getOffsetInBlockGroup())) {
+ readOneStripe(corruptedBlockMap);
}
- idxInGroup = (idxInGroup + 1) % dataBlkNum;
+ int ret = copy(strategy, off + result, realLen - result);
+ result += ret;
+ pos += ret;
}
- pos += result;
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(result);
}
@@ -295,11 +376,11 @@ public class DFSStripedInputStream extends DFSInputStream {
return -1;
}
- private synchronized int readBuffer(BlockReader blockReader,
- DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len,
+ private int readBuffer(BlockReader blockReader,
+ DatanodeInfo currentNode, ByteBufferStrategy readerStrategy,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
try {
- return readerStrategy.doRead(blockReader, off, len);
+ return readerStrategy.doRead(blockReader, 0, 0);
} catch ( ChecksumException ce ) {
DFSClient.LOG.warn("Found Checksum error for "
+ getCurrentBlock() + " from " + currentNode
@@ -312,26 +393,25 @@ public class DFSStripedInputStream extends DFSInputStream {
+ getCurrentBlock() + " of " + src + " from "
+ currentNode, e);
}
- // TODO: this should trigger decoding logic (HDFS-7678)
return -1;
}
- protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy {
- ByteBufferStrategy(ByteBuffer buf) {
- super(buf);
- }
-
- @Override
- public int doRead(BlockReader blockReader, int off, int len)
- throws IOException {
- int oldlimit = buf.limit();
- if (buf.remaining() > len) {
- buf.limit(buf.position() + len);
- }
- int ret = super.doRead(blockReader, off, len);
- buf.limit(oldlimit);
- return ret;
- }
+ /**
+ * Copy the data from {@link #curStripeBuf} into the given buffer
+ * @param strategy the ReaderStrategy containing the given buffer
+ * @param offset the offset of the given buffer. Used only when strategy is
+ * a ByteArrayStrategy
+ * @param length target length
+ * @return number of bytes copied
+ */
+ private int copy(ReaderStrategy strategy, int offset, int length) {
+ final long stripeLen = cellSize * dataBlkNum;
+ final long offsetInBlk = pos - currentLocatedBlock.getStartOffset();
+ // compute the position in the curStripeBuf based on "pos"
+ int bufOffset = (int) (offsetInBlk % stripeLen);
+ curStripeBuf.position(bufOffset);
+ return strategy.copyFrom(curStripeBuf, offset,
+ Math.min(length, curStripeBuf.remaining()));
}
/**
@@ -366,8 +446,7 @@ public class DFSStripedInputStream extends DFSInputStream {
DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
+ blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
}
- return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize,
- dataBlkNum, idx);
+ return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize, dataBlkNum, idx);
}
private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
@@ -404,7 +483,7 @@ public class DFSStripedInputStream extends DFSInputStream {
for (short i = 0; i < dataBlkNum; i++) {
ReadPortion rp = readPortions[i];
- if (rp.readLength <= 0) {
+ if (rp.getReadLength() <= 0) {
continue;
}
DatanodeInfo loc = blks[i].getLocations()[0];
@@ -413,8 +492,8 @@ public class DFSStripedInputStream extends DFSInputStream {
loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
type);
Callable<Void> readCallable = getFromOneDataNode(dnAddr,
- blks[i].getStartOffset(), rp.startOffsetInBlock,
- rp.startOffsetInBlock + rp.readLength - 1, buf,
+ blks[i].getStartOffset(), rp.getStartOffsetInBlock(),
+ rp.getStartOffsetInBlock() + rp.getReadLength() - 1, buf,
rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
@@ -451,14 +530,14 @@ public class DFSStripedInputStream extends DFSInputStream {
};
}
- private void waitNextCompletion(CompletionService<Void> stripedReadsService,
- Map<Future<Void>, Integer> futures) throws InterruptedException {
+ private <T> void waitNextCompletion(CompletionService<T> service,
+ Map<Future<T>, Integer> futures) throws InterruptedException {
if (futures.isEmpty()) {
throw new InterruptedException("Futures already empty");
}
- Future<Void> future = null;
+ Future<T> future = null;
try {
- future = stripedReadsService.take();
+ future = service.take();
future.get();
futures.remove(future);
} catch (ExecutionException | CancellationException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index b18e36f..24d4bfb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -169,22 +169,22 @@ public class StripedBlockUtil {
// blkIdxInGroup is the index of the block in the striped block group
// E.g., blk_2 is the 3rd block in the group
final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
- results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
- startInBlk % cellSize;
+ results[blkIdxInGroup].setStartOffsetInBlock(cellSize * cellIdxInBlk +
+ startInBlk % cellSize);
boolean crossStripe = false;
for (int i = 1; i < dataBlkNum; i++) {
if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
cellIdxInBlk++;
crossStripe = true;
}
- results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
- cellSize * cellIdxInBlk;
+ results[(blkIdxInGroup + i) % dataBlkNum].setStartOffsetInBlock(
+ cellSize * cellIdxInBlk);
}
int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
results[blkIdxInGroup].lengths.add(firstCellLen);
- results[blkIdxInGroup].readLength += firstCellLen;
+ results[blkIdxInGroup].addReadLength(firstCellLen);
int i = (blkIdxInGroup + 1) % dataBlkNum;
for (int done = firstCellLen; done < len; done += cellSize) {
@@ -192,7 +192,7 @@ public class StripedBlockUtil {
rp.offsetsInBuf.add(done + bufOffset);
final int readLen = Math.min(len - done, cellSize);
rp.lengths.add(readLen);
- rp.readLength += readLen;
+ rp.addReadLength(readLen);
i = (i + 1) % dataBlkNum;
}
return results;
@@ -274,8 +274,8 @@ public class StripedBlockUtil {
* | (partial) | (from blk_1 and blk_2) | |
* +------------------------------------------------------+
*/
- public long startOffsetInBlock = 0;
- public int readLength = 0;
+ private long startOffsetInBlock = 0;
+ private int readLength = 0;
public final List<Integer> offsetsInBuf = new ArrayList<>();
public final List<Integer> lengths = new ArrayList<>();
@@ -295,10 +295,20 @@ public class StripedBlockUtil {
return lens;
}
- public boolean containsReadPortion(ReadPortion rp) {
- long end = startOffsetInBlock + readLength;
- return startOffsetInBlock <= rp.startOffsetInBlock && end >=
- rp.startOffsetInBlock + rp.readLength;
+ public long getStartOffsetInBlock() {
+ return startOffsetInBlock;
+ }
+
+ public int getReadLength() {
+ return readLength;
+ }
+
+ public void setStartOffsetInBlock(long startOffsetInBlock) {
+ this.startOffsetInBlock = startOffsetInBlock;
+ }
+
+ void addReadLength(int extraLength) {
+ this.readLength += extraLength;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index bcfc74b..11cdf7b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -158,7 +158,7 @@ public class TestDFSStripedInputStream {
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
throws IOException {
Path testPath = new Path(src);
- byte[] bytes = generateBytes(writeBytes);
+ final byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
//check file length
@@ -175,7 +175,8 @@ public class TestDFSStripedInputStream {
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
- Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
+ Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
+ buf[i]);
}
}
@@ -190,12 +191,12 @@ public class TestDFSStripedInputStream {
readLen += ret;
}
} while (ret >= 0);
-
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
- Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
+ Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
+ buf[i]);
}
}
@@ -214,8 +215,47 @@ public class TestDFSStripedInputStream {
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
- Assert.assertEquals("Byte at i should be the same", getByte(i), buf.array()[i]);
+ Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
+ buf.array()[i]);
}
}
+
+ // stateful read with 1KB size byte array
+ try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+ final byte[] result = new byte[writeBytes];
+ final byte[] buf = new byte[1024];
+ int readLen = 0;
+ int ret;
+ do {
+ ret = fsdis.read(buf, 0, buf.length);
+ if (ret > 0) {
+ System.arraycopy(buf, 0, result, readLen, ret);
+ readLen += ret;
+ }
+ } while (ret >= 0);
+ Assert.assertEquals("The length of file should be the same to write size",
+ writeBytes, readLen);
+ Assert.assertArrayEquals(bytes, result);
+ }
+
+ // stateful read using ByteBuffer with 1KB size
+ try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+ final ByteBuffer result = ByteBuffer.allocate(writeBytes);
+ final ByteBuffer buf = ByteBuffer.allocate(1024);
+ int readLen = 0;
+ int ret;
+ do {
+ ret = fsdis.read(buf);
+ if (ret > 0) {
+ readLen += ret;
+ buf.flip();
+ result.put(buf);
+ buf.clear();
+ }
+ } while (ret >= 0);
+ Assert.assertEquals("The length of file should be the same to write size",
+ writeBytes, readLen);
+ Assert.assertArrayEquals(bytes, result.array());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
index 3b5787a..75d0587 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
@@ -38,8 +38,8 @@ public class TestPlanReadPortions {
assertEquals(GROUP_SIZE, results.length);
for (int i = 0; i < GROUP_SIZE; i++) {
- assertEquals(readLengths[i], results[i].readLength);
- assertEquals(offsetsInBlock[i], results[i].startOffsetInBlock);
+ assertEquals(readLengths[i], results[i].getReadLength());
+ assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
final int[] bOffsets = results[i].getOffsets();
assertArrayEquals(bufferOffsets[i], bOffsets);
final int[] bLengths = results[i].getLengths();