You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/17 01:58:50 UTC

[22/50] hadoop git commit: HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout. Contributed by Jing Zhao.

HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67a22d24
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67a22d24
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67a22d24

Branch: refs/heads/HDFS-7285
Commit: 67a22d24cfec08602344766b5eb0d15ef82bf79d
Parents: 4bae0b1
Author: Jing Zhao <ji...@apache.org>
Authored: Mon May 4 14:44:58 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Sat May 16 15:16:04 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  26 +++
 .../hadoop/hdfs/DFSStripedInputStream.java      | 217 +++++++++++++------
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  34 ++-
 .../hadoop/hdfs/TestDFSStripedInputStream.java  |  50 ++++-
 .../hadoop/hdfs/TestPlanReadPortions.java       |   4 +-
 6 files changed, 246 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index e30b2ed..77272e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -161,3 +161,6 @@
 
     HDFS-8316. Erasure coding: refactor EC constants to be consistent with HDFS-8249.
     (Zhe Zhang via jing9)
+
+    HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout.
+    (jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index bef4da0..ca799fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -716,6 +716,16 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   interface ReaderStrategy {
     public int doRead(BlockReader blockReader, int off, int len)
         throws ChecksumException, IOException;
+
+    /**
+     * Copy data from the src ByteBuffer into the read buffer.
+     * @param src The src buffer where the data is copied from
+     * @param offset Useful only when the ReadStrategy is based on a byte array.
+     *               Indicate the offset of the byte array for copy.
+     * @param length Useful only when the ReadStrategy is based on a byte array.
+     *               Indicate the length of the data to copy.
+     */
+    public int copyFrom(ByteBuffer src, int offset, int length);
   }
 
   protected void updateReadStatistics(ReadStatistics readStatistics,
@@ -749,6 +759,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       updateReadStatistics(readStatistics, nRead, blockReader);
       return nRead;
     }
+
+    @Override
+    public int copyFrom(ByteBuffer src, int offset, int length) {
+      ByteBuffer writeSlice = src.duplicate();
+      writeSlice.get(buf, offset, length);
+      return length;
+    }
   }
 
   /**
@@ -782,6 +799,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         }
       } 
     }
+
+    @Override
+    public int copyFrom(ByteBuffer src, int offset, int length) {
+      ByteBuffer writeSlice = src.duplicate();
+      int remaining = Math.min(buf.remaining(), writeSlice.remaining());
+      writeSlice.limit(writeSlice.position() + remaining);
+      buf.put(writeSlice);
+      return remaining;
+    }
   }
 
   /* This is a used by regular read() and handles ChecksumExceptions.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 0dc98fd..13c4743 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.*;
@@ -37,6 +38,7 @@ import java.util.Set;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.CancellationException;
@@ -62,7 +64,7 @@ import java.util.concurrent.Future;
  * +------+  <- A cell contains {@link #cellSize} bytes of data
  *
  * Three styles of read will eventually be supported:
- *   1. Stateful read: TODO: HDFS-8033
+ *   1. Stateful read
  *   2. pread without decode support
  *     This is implemented by calculating the portion of read from each block and
  *     issuing requests to each DataNode in parallel.
@@ -91,12 +93,38 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
   }
 
+  /** Used to indicate the buffered data's range in the block group */
+  private static class StripeRange {
+    /** start offset in the block group (inclusive) */
+    final long offsetInBlock;
+    /** length of the stripe range */
+    final long length;
+
+    StripeRange(long offsetInBlock, long length) {
+      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+      this.offsetInBlock = offsetInBlock;
+      this.length = length;
+    }
+
+    boolean include(long pos) {
+      return pos >= offsetInBlock && pos < offsetInBlock + length;
+    }
+  }
+
   private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
   private final BlockReader[] blockReaders = new BlockReader[groupSize];
   private final DatanodeInfo[] currentNodes = new DatanodeInfo[groupSize];
   private final int cellSize;
   private final short dataBlkNum;
   private final short parityBlkNum;
+  /** the buffer for a complete stripe */
+  private ByteBuffer curStripeBuf;
+  /**
+   * indicate the start/end offset of the current buffered stripe in the
+   * block group
+   */
+  private StripeRange curStripeRange;
+  private final CompletionService<Integer> readingService;
 
   DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
       ECInfo ecInfo) throws IOException {
@@ -106,7 +134,20 @@ public class DFSStripedInputStream extends DFSInputStream {
     cellSize = ecInfo.getSchema().getChunkSize();
     dataBlkNum = (short) ecInfo.getSchema().getNumDataUnits();
     parityBlkNum = (short) ecInfo.getSchema().getNumParityUnits();
-    DFSClient.LOG.debug("Creating an striped input stream for file " + src);
+    curStripeRange = new StripeRange(0, 0);
+    readingService =
+        new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Creating an striped input stream for file " + src);
+    }
+  }
+
+  private void resetCurStripeBuffer() {
+    if (curStripeBuf == null) {
+      curStripeBuf = ByteBuffer.allocateDirect(cellSize * dataBlkNum);
+    }
+    curStripeBuf.clear();
+    curStripeRange = new StripeRange(0, 0);
   }
 
   @Override
@@ -141,7 +182,7 @@ public class DFSStripedInputStream extends DFSInputStream {
         targetBlockGroup.getBlockSize() - 1;
     currentLocatedBlock = targetBlockGroup;
 
-    long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset();
+    final long offsetIntoBlockGroup = getOffsetInBlockGroup();
     LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
         targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
     // The purpose is to get start offset into each block
@@ -156,8 +197,8 @@ public class DFSStripedInputStream extends DFSInputStream {
         if (retval != null) {
           currentNodes[i] = retval.info;
           blockReaders[i] = getBlockReaderWithRetry(targetBlock,
-              readPortions[i].startOffsetInBlock,
-              targetBlock.getBlockSize() - readPortions[i].startOffsetInBlock,
+              readPortions[i].getStartOffsetInBlock(),
+              targetBlock.getBlockSize() - readPortions[i].getStartOffsetInBlock(),
               retval.addr, retval.storageType, retval.info, target, retry);
         }
       }
@@ -203,6 +244,7 @@ public class DFSStripedInputStream extends DFSInputStream {
    */
   @Override
   protected void closeCurrentBlockReaders() {
+    resetCurStripeBuffer();
     if (blockReaders ==  null || blockReaders.length == 0) {
       return;
     }
@@ -220,6 +262,73 @@ public class DFSStripedInputStream extends DFSInputStream {
     blockEnd = -1;
   }
 
+  private long getOffsetInBlockGroup() {
+    return pos - currentLocatedBlock.getStartOffset();
+  }
+
+  /**
+   * Read a new stripe covering the current position, and store the data in the
+   * {@link #curStripeBuf}.
+   */
+  private void readOneStripe(
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
+    resetCurStripeBuffer();
+
+    // compute stripe range based on pos
+    final long offsetInBlockGroup = getOffsetInBlockGroup();
+    final long stripeLen = cellSize * dataBlkNum;
+    int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
+    curStripeRange = new StripeRange(stripeIndex * stripeLen,
+        Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen),
+            stripeLen));
+    final int numCell = (int) ((curStripeRange.length - 1) / cellSize + 1);
+
+    // read the whole stripe in parallel
+    Map<Future<Integer>, Integer> futures = new HashMap<>();
+    for (int i = 0; i < numCell; i++) {
+      curStripeBuf.position(cellSize * i);
+      curStripeBuf.limit((int) Math.min(cellSize * (i + 1),
+          curStripeRange.length));
+      ByteBuffer buf = curStripeBuf.slice();
+      ByteBufferStrategy strategy = new ByteBufferStrategy(buf);
+      final int targetLength = buf.remaining();
+      Callable<Integer> readCallable = readCell(blockReaders[i],
+          currentNodes[i], strategy, targetLength, corruptedBlockMap);
+      Future<Integer> request = readingService.submit(readCallable);
+      futures.put(request, i);
+    }
+    while (!futures.isEmpty()) {
+      try {
+        waitNextCompletion(readingService, futures);
+        // TODO: decode and record bad reader if necessary
+      } catch (InterruptedException ignored) {
+        // ignore and retry
+      }
+    }
+  }
+
+  private Callable<Integer> readCell(final BlockReader reader,
+      final DatanodeInfo datanode, final ByteBufferStrategy strategy,
+      final int targetLength,
+      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+    return new Callable<Integer>() {
+      @Override
+      public Integer call() throws Exception {
+        int result = 0;
+        while (result < targetLength) {
+          int ret = readBuffer(reader, datanode, strategy, corruptedBlockMap);
+          if (ret < 0) {
+            throw new IOException("Unexpected EOS from the reader");
+          }
+          result += ret;
+        }
+        updateReadStatistics(readStatistics, targetLength, reader);
+        return result;
+      }
+    };
+  }
+
   @Override
   protected synchronized int readWithStrategy(ReaderStrategy strategy,
       int off, int len) throws IOException {
@@ -227,11 +336,10 @@ public class DFSStripedInputStream extends DFSInputStream {
     if (closed.get()) {
       throw new IOException("Stream closed");
     }
-    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
+    Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
+        new ConcurrentHashMap<>();
     failures = 0;
     if (pos < getFileLength()) {
-      /** Index of the target block in a stripe to read from */
-      int idxInGroup = (int) ((pos / cellSize) % dataBlkNum);
       try {
         if (pos > blockEnd) {
           blockSeekTo(pos);
@@ -247,40 +355,13 @@ public class DFSStripedInputStream extends DFSInputStream {
         /** Number of bytes already read into buffer */
         int result = 0;
         while (result < realLen) {
-          /**
-           * Temporary position into the file; {@link pos} might not proceed
-           * to this temporary position in case of exceptions.
-           */
-          long tmpPos = pos + result;
-          /** Start and end offsets of a cell in the file */
-          long cellStart = (tmpPos / cellSize) * cellSize;
-          long cellEnd = cellStart + cellSize - 1;
-
-          /** Number of bytes to read from the current cell */
-          int realLenInCell = (int) Math.min(realLen - result,
-              cellEnd - tmpPos + 1L);
-          assert realLenInCell > 0 : "Temporary position shouldn't be "
-              + "after cellEnd";
-
-          // Read from one blockReader up to cell boundary
-          int cellRet = readBuffer(blockReaders[idxInGroup],
-              currentNodes[idxInGroup], strategy, off + result, realLenInCell,
-              corruptedBlockMap);
-          if (cellRet >= 0) {
-            result += cellRet;
-            if (cellRet < realLenInCell) {
-              // A short read indicates the current blockReader buffer is
-              // already drained. Should return the read call. Otherwise
-              // should proceed to the next cell.
-              break;
-            }
-          } else {
-            // got a EOS from reader though we expect more data on it.
-            throw new IOException("Unexpected EOS from the reader");
+          if (!curStripeRange.include(getOffsetInBlockGroup())) {
+            readOneStripe(corruptedBlockMap);
           }
-          idxInGroup = (idxInGroup + 1) % dataBlkNum;
+          int ret = copy(strategy, off + result, realLen - result);
+          result += ret;
+          pos += ret;
         }
-        pos += result;
         if (dfsClient.stats != null) {
           dfsClient.stats.incrementBytesRead(result);
         }
@@ -295,11 +376,11 @@ public class DFSStripedInputStream extends DFSInputStream {
     return -1;
   }
 
-  private synchronized int readBuffer(BlockReader blockReader,
-      DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len,
+  private int readBuffer(BlockReader blockReader,
+      DatanodeInfo currentNode, ByteBufferStrategy readerStrategy,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
     try {
-      return readerStrategy.doRead(blockReader, off, len);
+      return readerStrategy.doRead(blockReader, 0, 0);
     } catch ( ChecksumException ce ) {
       DFSClient.LOG.warn("Found Checksum error for "
           + getCurrentBlock() + " from " + currentNode
@@ -312,26 +393,25 @@ public class DFSStripedInputStream extends DFSInputStream {
           + getCurrentBlock() + " of " + src + " from "
           + currentNode, e);
     }
-    // TODO: this should trigger decoding logic (HDFS-7678)
     return -1;
   }
 
-  protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy {
-    ByteBufferStrategy(ByteBuffer buf) {
-      super(buf);
-    }
-
-    @Override
-    public int doRead(BlockReader blockReader, int off, int len)
-        throws IOException {
-      int oldlimit = buf.limit();
-      if (buf.remaining() > len) {
-        buf.limit(buf.position() + len);
-      }
-      int ret = super.doRead(blockReader, off, len);
-      buf.limit(oldlimit);
-      return ret;
-    }
+  /**
+   * Copy the data from {@link #curStripeBuf} into the given buffer
+   * @param strategy the ReaderStrategy containing the given buffer
+   * @param offset the offset of the given buffer. Used only when strategy is
+   *               a ByteArrayStrategy
+   * @param length target length
+   * @return number of bytes copied
+   */
+  private int copy(ReaderStrategy strategy, int offset, int length) {
+    final long stripeLen = cellSize * dataBlkNum;
+    final long offsetInBlk = pos - currentLocatedBlock.getStartOffset();
+    // compute the position in the curStripeBuf based on "pos"
+    int bufOffset = (int) (offsetInBlk % stripeLen);
+    curStripeBuf.position(bufOffset);
+    return strategy.copyFrom(curStripeBuf, offset,
+        Math.min(length, curStripeBuf.remaining()));
   }
 
   /**
@@ -366,8 +446,7 @@ public class DFSStripedInputStream extends DFSInputStream {
       DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
           + blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
     }
-    return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize,
-        dataBlkNum, idx);
+    return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize, dataBlkNum, idx);
   }
 
   private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
@@ -404,7 +483,7 @@ public class DFSStripedInputStream extends DFSInputStream {
 
     for (short i = 0; i < dataBlkNum; i++) {
       ReadPortion rp = readPortions[i];
-      if (rp.readLength <= 0) {
+      if (rp.getReadLength() <= 0) {
         continue;
       }
       DatanodeInfo loc = blks[i].getLocations()[0];
@@ -413,8 +492,8 @@ public class DFSStripedInputStream extends DFSInputStream {
           loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
           type);
       Callable<Void> readCallable = getFromOneDataNode(dnAddr,
-          blks[i].getStartOffset(), rp.startOffsetInBlock,
-          rp.startOffsetInBlock + rp.readLength - 1, buf,
+          blks[i].getStartOffset(), rp.getStartOffsetInBlock(),
+          rp.getStartOffsetInBlock() + rp.getReadLength() - 1, buf,
           rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
       Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
       DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
@@ -451,14 +530,14 @@ public class DFSStripedInputStream extends DFSInputStream {
     };
   }
 
-  private void waitNextCompletion(CompletionService<Void> stripedReadsService,
-      Map<Future<Void>, Integer> futures) throws InterruptedException {
+  private <T> void waitNextCompletion(CompletionService<T> service,
+      Map<Future<T>, Integer> futures) throws InterruptedException {
     if (futures.isEmpty()) {
       throw new InterruptedException("Futures already empty");
     }
-    Future<Void> future = null;
+    Future<T> future = null;
     try {
-      future = stripedReadsService.take();
+      future = service.take();
       future.get();
       futures.remove(future);
     } catch (ExecutionException | CancellationException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index b18e36f..24d4bfb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -169,22 +169,22 @@ public class StripedBlockUtil {
     // blkIdxInGroup is the index of the block in the striped block group
     // E.g., blk_2 is the 3rd block in the group
     final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
-    results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
-        startInBlk % cellSize;
+    results[blkIdxInGroup].setStartOffsetInBlock(cellSize * cellIdxInBlk +
+        startInBlk % cellSize);
     boolean crossStripe = false;
     for (int i = 1; i < dataBlkNum; i++) {
       if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
         cellIdxInBlk++;
         crossStripe = true;
       }
-      results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
-          cellSize * cellIdxInBlk;
+      results[(blkIdxInGroup + i) % dataBlkNum].setStartOffsetInBlock(
+          cellSize * cellIdxInBlk);
     }
 
     int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
     results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
     results[blkIdxInGroup].lengths.add(firstCellLen);
-    results[blkIdxInGroup].readLength += firstCellLen;
+    results[blkIdxInGroup].addReadLength(firstCellLen);
 
     int i = (blkIdxInGroup + 1) % dataBlkNum;
     for (int done = firstCellLen; done < len; done += cellSize) {
@@ -192,7 +192,7 @@ public class StripedBlockUtil {
       rp.offsetsInBuf.add(done + bufOffset);
       final int readLen = Math.min(len - done, cellSize);
       rp.lengths.add(readLen);
-      rp.readLength += readLen;
+      rp.addReadLength(readLen);
       i = (i + 1) % dataBlkNum;
     }
     return results;
@@ -274,8 +274,8 @@ public class StripedBlockUtil {
      * |  (partial)  |    (from blk_1 and blk_2)   |          |
      * +------------------------------------------------------+
      */
-    public long startOffsetInBlock = 0;
-    public int readLength = 0;
+    private long startOffsetInBlock = 0;
+    private int readLength = 0;
     public final List<Integer> offsetsInBuf = new ArrayList<>();
     public final List<Integer> lengths = new ArrayList<>();
 
@@ -295,10 +295,20 @@ public class StripedBlockUtil {
       return lens;
     }
 
-    public boolean containsReadPortion(ReadPortion rp) {
-      long end = startOffsetInBlock + readLength;
-      return startOffsetInBlock <= rp.startOffsetInBlock && end >=
-          rp.startOffsetInBlock + rp.readLength;
+    public long getStartOffsetInBlock() {
+      return startOffsetInBlock;
+    }
+
+    public int getReadLength() {
+      return readLength;
+    }
+
+    public void setStartOffsetInBlock(long startOffsetInBlock) {
+      this.startOffsetInBlock = startOffsetInBlock;
+    }
+
+    void addReadLength(int extraLength) {
+      this.readLength += extraLength;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index bcfc74b..11cdf7b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -158,7 +158,7 @@ public class TestDFSStripedInputStream {
   private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
       throws IOException {
     Path testPath = new Path(src);
-    byte[] bytes = generateBytes(writeBytes);
+    final byte[] bytes = generateBytes(writeBytes);
     DFSTestUtil.writeFile(fs, testPath, new String(bytes));
 
     //check file length
@@ -175,7 +175,8 @@ public class TestDFSStripedInputStream {
       Assert.assertEquals("The length of file should be the same to write size",
           writeBytes, readLen);
       for (int i = 0; i < writeBytes; i++) {
-        Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
+        Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
+            buf[i]);
       }
     }
 
@@ -190,12 +191,12 @@ public class TestDFSStripedInputStream {
           readLen += ret;
         }
       } while (ret >= 0);
-
       readLen = readLen >= 0 ? readLen : 0;
       Assert.assertEquals("The length of file should be the same to write size",
           writeBytes, readLen);
       for (int i = 0; i < writeBytes; i++) {
-        Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
+        Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
+            buf[i]);
       }
     }
 
@@ -214,8 +215,47 @@ public class TestDFSStripedInputStream {
       Assert.assertEquals("The length of file should be the same to write size",
           writeBytes, readLen);
       for (int i = 0; i < writeBytes; i++) {
-        Assert.assertEquals("Byte at i should be the same", getByte(i), buf.array()[i]);
+        Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
+            buf.array()[i]);
       }
     }
+
+    // stateful read with 1KB size byte array
+    try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+      final byte[] result = new byte[writeBytes];
+      final byte[] buf = new byte[1024];
+      int readLen = 0;
+      int ret;
+      do {
+        ret = fsdis.read(buf, 0, buf.length);
+        if (ret > 0) {
+          System.arraycopy(buf, 0, result, readLen, ret);
+          readLen += ret;
+        }
+      } while (ret >= 0);
+      Assert.assertEquals("The length of file should be the same to write size",
+          writeBytes, readLen);
+      Assert.assertArrayEquals(bytes, result);
+    }
+
+    // stateful read using ByteBuffer with 1KB size
+    try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+      final ByteBuffer result = ByteBuffer.allocate(writeBytes);
+      final ByteBuffer buf = ByteBuffer.allocate(1024);
+      int readLen = 0;
+      int ret;
+      do {
+        ret = fsdis.read(buf);
+        if (ret > 0) {
+          readLen += ret;
+          buf.flip();
+          result.put(buf);
+          buf.clear();
+        }
+      } while (ret >= 0);
+      Assert.assertEquals("The length of file should be the same to write size",
+          writeBytes, readLen);
+      Assert.assertArrayEquals(bytes, result.array());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67a22d24/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
index 3b5787a..75d0587 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
@@ -38,8 +38,8 @@ public class TestPlanReadPortions {
     assertEquals(GROUP_SIZE, results.length);
 
     for (int i = 0; i < GROUP_SIZE; i++) {
-      assertEquals(readLengths[i], results[i].readLength);
-      assertEquals(offsetsInBlock[i], results[i].startOffsetInBlock);
+      assertEquals(readLengths[i], results[i].getReadLength());
+      assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
       final int[] bOffsets = results[i].getOffsets();
       assertArrayEquals(bufferOffsets[i], bOffsets);
       final int[] bLengths = results[i].getLengths();