You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/17 01:58:43 UTC

[15/50] hadoop git commit: HDFS-8033. Erasure coding: stateful (non-positional) read from files in striped layout. Contributed by Zhe Zhang.

HDFS-8033. Erasure coding: stateful (non-positional) read from files in striped layout. Contributed by Zhe Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8417a50
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8417a50
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8417a50

Branch: refs/heads/HDFS-7285
Commit: d8417a50bd0418b16b489065670cbdf4cab1cfd3
Parents: f5c6ec6
Author: Zhe Zhang <zh...@apache.org>
Authored: Fri Apr 24 22:36:15 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Sat May 16 15:15:23 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  55 ++--
 .../hadoop/hdfs/DFSStripedInputStream.java      | 311 ++++++++++++++++++-
 .../hadoop/hdfs/TestDFSStripedInputStream.java  |  43 +++
 .../apache/hadoop/hdfs/TestReadStripedFile.java | 110 ++++++-
 5 files changed, 465 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8417a50/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index cf41a9b..e8db485 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -131,3 +131,6 @@
 
     HDFS-8228. Erasure Coding: SequentialBlockGroupIdGenerator#nextValue may cause 
     block id conflicts (Jing Zhao via Zhe Zhang)
+
+    HDFS-8033. Erasure coding: stateful (non-positional) read from files in 
+    striped layout (Zhe Zhang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8417a50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 16250dd..6eb25d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -95,34 +95,34 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
   protected final DFSClient dfsClient;
-  private AtomicBoolean closed = new AtomicBoolean(false);
-  private final String src;
-  private final boolean verifyChecksum;
+  protected AtomicBoolean closed = new AtomicBoolean(false);
+  protected final String src;
+  protected final boolean verifyChecksum;
 
   // state by stateful read only:
   // (protected by lock on this)
   /////
   private DatanodeInfo currentNode = null;
-  private LocatedBlock currentLocatedBlock = null;
-  private long pos = 0;
-  private long blockEnd = -1;
+  protected LocatedBlock currentLocatedBlock = null;
+  protected long pos = 0;
+  protected long blockEnd = -1;
   private BlockReader blockReader = null;
   ////
 
   // state shared by stateful and positional read:
   // (protected by lock on infoLock)
   ////
-  private LocatedBlocks locatedBlocks = null;
+  protected LocatedBlocks locatedBlocks = null;
   private long lastBlockBeingWrittenLength = 0;
   private FileEncryptionInfo fileEncryptionInfo = null;
-  private CachingStrategy cachingStrategy;
+  protected CachingStrategy cachingStrategy;
   ////
 
-  private final ReadStatistics readStatistics = new ReadStatistics();
+  protected final ReadStatistics readStatistics = new ReadStatistics();
   // lock for state shared between read and pread
   // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
   //       (it's OK to acquire this lock when the lock on <this> is held)
-  private final Object infoLock = new Object();
+  protected final Object infoLock = new Object();
 
   /**
    * Track the ByteBuffers that we have handed out to readers.
@@ -239,7 +239,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * back to the namenode to get a new list of block locations, and is
    * capped at maxBlockAcquireFailures
    */
-  private int failures = 0;
+  protected int failures = 0;
 
   /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
    * parallel accesses to DFSInputStream (through ptreads) properly */
@@ -476,7 +476,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /** Fetch a block from namenode and cache it */
-  private void fetchBlockAt(long offset) throws IOException {
+  protected void fetchBlockAt(long offset) throws IOException {
     synchronized(infoLock) {
       int targetBlockIdx = locatedBlocks.findBlock(offset);
       if (targetBlockIdx < 0) { // block is not cached
@@ -579,7 +579,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
 
     // Will be getting a new BlockReader.
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
 
     //
     // Connect to best DataNode for desired Block, with potential offset
@@ -620,7 +620,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         return chosenNode;
       } catch (IOException ex) {
         if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
-          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, "
               + "encryption key was invalid when connecting to " + targetAddr
               + " : " + ex);
           // The encryption key used is invalid.
@@ -696,7 +696,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           "unreleased ByteBuffers allocated by read().  " +
           "Please release " + builder.toString() + ".");
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
     super.close();
   }
 
@@ -718,7 +718,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         throws ChecksumException, IOException;
   }
 
-  private void updateReadStatistics(ReadStatistics readStatistics, 
+  protected void updateReadStatistics(ReadStatistics readStatistics,
         int nRead, BlockReader blockReader) {
     if (nRead <= 0) return;
     synchronized(infoLock) {
@@ -754,7 +754,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    * Used to read bytes into a user-supplied ByteBuffer
    */
-  private class ByteBufferStrategy implements ReaderStrategy {
+  protected class ByteBufferStrategy implements ReaderStrategy {
     final ByteBuffer buf;
     ByteBufferStrategy(ByteBuffer buf) {
       this.buf = buf;
@@ -770,6 +770,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         int ret = blockReader.read(buf);
         success = true;
         updateReadStatistics(readStatistics, ret, blockReader);
+        if (ret == 0) {
+          DFSClient.LOG.warn("zero");
+        }
         return ret;
       } finally {
         if (!success) {
@@ -837,7 +840,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
+  protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
     dfsClient.checkOpen();
     if (closed.get()) {
       throw new IOException("Stream closed");
@@ -926,7 +929,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    * Add corrupted block replica into map.
    */
-  private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, 
+  protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
     Set<DatanodeInfo> dnSet = null;
     if((corruptedBlockMap.containsKey(blk))) {
@@ -996,7 +999,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @param ignoredNodes Do not choose nodes in this array (may be null)
    * @return The DNAddrPair of the best node. Null if no node can be chosen.
    */
-  private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
+  protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) {
     DatanodeInfo[] nodes = block.getLocations();
     StorageType[] storageTypes = block.getStorageTypes();
@@ -1365,7 +1368,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return true if block access token has expired or invalid and it should be
    *         refetched
    */
-  private static boolean tokenRefetchNeeded(IOException ex,
+  protected static boolean tokenRefetchNeeded(IOException ex,
       InetSocketAddress targetAddr) {
     /*
      * Get a new access token and retry. Retry is needed in 2 cases. 1)
@@ -1472,7 +1475,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @param corruptedBlockMap map of corrupted blocks
    * @param dataNodeCount number of data nodes who contains the block replicas
    */
-  private void reportCheckSumFailure(
+  protected void reportCheckSumFailure(
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, 
       int dataNodeCount) {
     if (corruptedBlockMap.isEmpty()) {
@@ -1669,7 +1672,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private void closeCurrentBlockReader() {
+  protected void closeCurrentBlockReaders() {
     if (blockReader == null) return;
     // Close the current block reader so that the new caching settings can 
     // take effect immediately.
@@ -1689,7 +1692,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       this.cachingStrategy =
           new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 
   @Override
@@ -1699,7 +1702,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       this.cachingStrategy =
           new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 
   /**
@@ -1857,6 +1860,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
 
   @Override
   public synchronized void unbuffer() {
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8417a50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index d0e2b68..fe9e101 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -18,20 +18,21 @@
 package org.apache.hadoop.hdfs;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
-import org.apache.hadoop.hdfs.protocol.ECInfo;
-import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -125,6 +126,9 @@ public class DFSStripedInputStream extends DFSInputStream {
     return results;
   }
 
+  private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
+  private BlockReader[] blockReaders = null;
+  private DatanodeInfo[] currentNodes = null;
   private final int cellSize;
   private final short dataBlkNum;
   private final short parityBlkNum;
@@ -143,13 +147,285 @@ public class DFSStripedInputStream extends DFSInputStream {
 
   @Override
   public synchronized int read(final ByteBuffer buf) throws IOException {
-    throw new UnsupportedActionException("Stateful read is not supported");
+    ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
+    TraceScope scope =
+        dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src);
+    try {
+      return readWithStrategy(byteBufferReader, 0, buf.remaining());
+    } finally {
+      scope.close();
+    }
+  }
+
+  /**
+   * When seeking into a new block group, create blockReader for each internal
+   * block in the group.
+   */
+  @VisibleForTesting
+  private synchronized DatanodeInfo[] blockSeekTo(long target)
+      throws IOException {
+    if (target >= getFileLength()) {
+      throw new IOException("Attempted to read past end of file");
+    }
+
+    // Will be getting a new BlockReader.
+    closeCurrentBlockReaders();
+
+    // Connect to best DataNode for desired Block, with potential offset
+    DatanodeInfo[] chosenNodes = new DatanodeInfo[groupSize];
+    int refetchToken = 1; // only need to get a new access token once
+    int refetchEncryptionKey = 1; // only need to get a new encryption key once
+
+    // Compute desired striped block group
+    LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
+
+    // Update current position
+    this.pos = target;
+    this.blockEnd = targetBlockGroup.getStartOffset() +
+        targetBlockGroup.getBlockSize() - 1;
+
+    long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset();
+    LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
+        targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
+    // The purpose is to get start offset into each block
+    ReadPortion[] readPortions = planReadPortions(groupSize, cellSize,
+        offsetIntoBlockGroup, 0, 0);
+    while (true) {
+      int i = 0;
+      InetSocketAddress targetAddr = null;
+      try {
+        blockReaders = new BlockReader[groupSize];
+        for (i = 0; i < groupSize; i++) {
+          LocatedBlock targetBlock = targetBlocks[i];
+          if (targetBlock == null) {
+            continue;
+          }
+          long offsetIntoBlock = readPortions[i].startOffsetInBlock;
+          DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
+          chosenNodes[i] = retval.info;
+          targetAddr = retval.addr;
+          StorageType storageType = retval.storageType;
+
+          ExtendedBlock blk = targetBlock.getBlock();
+          Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+          CachingStrategy curCachingStrategy;
+          boolean shortCircuitForbidden;
+          synchronized(infoLock) {
+            curCachingStrategy = cachingStrategy;
+            shortCircuitForbidden = shortCircuitForbidden();
+          }
+          blockReaders[i] = new BlockReaderFactory(dfsClient.getConf()).
+              setInetSocketAddress(targetAddr).
+              setRemotePeerFactory(dfsClient).
+              setDatanodeInfo(chosenNodes[i]).
+              setStorageType(storageType).
+              setFileName(src).
+              setBlock(blk).
+              setBlockToken(accessToken).
+              setStartOffset(offsetIntoBlock).
+              setVerifyChecksum(verifyChecksum).
+              setClientName(dfsClient.clientName).
+              setLength(blk.getNumBytes() - offsetIntoBlock).
+              setCachingStrategy(curCachingStrategy).
+              setAllowShortCircuitLocalReads(!shortCircuitForbidden).
+              setClientCacheContext(dfsClient.getClientContext()).
+              setUserGroupInformation(dfsClient.ugi).
+              setConfiguration(dfsClient.getConfiguration()).
+              build();
+        }
+        currentLocatedBlock = targetBlockGroup;
+        return chosenNodes;
+      } catch (IOException ex) {
+        // Retry in case of encryption key or token exceptions. Otherwise throw
+        // IOException: since each internal block is singly replicated, it's
+        // not meaningful trying to locate another replica.
+        if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+              + "encryption key was invalid when connecting to " + targetAddr
+              + " : " + ex);
+          // The encryption key used is invalid.
+          refetchEncryptionKey--;
+          dfsClient.clearDataEncryptionKey();
+        } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
+          refetchToken--;
+          fetchBlockAt(target);
+        } else {
+          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+              + ", add to deadNodes and continue. " + ex, ex);
+          // Put chosen node into dead list and throw exception
+          addToDeadNodes(chosenNodes[i]);
+          throw ex;
+        }
+      }
+    }
+  }
+
+  /**
+   * Extend the super method with the logic of switching between cells.
+   * When reaching the end of a cell, proceed to the next cell and read it
+   * with the next blockReader.
+   */
+  @Override
+  protected void closeCurrentBlockReaders() {
+    if (blockReaders ==  null || blockReaders.length == 0) {
+      return;
+    }
+    for (int i = 0; i < groupSize; i++) {
+      if (blockReaders[i] == null) {
+        continue;
+      }
+      try {
+        blockReaders[i].close();
+      } catch (IOException e) {
+        DFSClient.LOG.error("error closing blockReader", e);
+      }
+      blockReaders[i] = null;
+    }
+    blockEnd = -1;
   }
 
   @Override
-  public synchronized int read(final byte buf[], int off, int len)
+  protected synchronized int readWithStrategy(ReaderStrategy strategy,
+      int off, int len) throws IOException {
+    dfsClient.checkOpen();
+    if (closed.get()) {
+      throw new IOException("Stream closed");
+    }
+    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
+        = new HashMap<>();
+    failures = 0;
+    if (pos < getFileLength()) {
+      int retries = 2;
+      /** Index of the target block in a stripe to read from */
+      int idxInGroup = (int) ((pos / cellSize) % dataBlkNum);
+      while (retries > 0) {
+        try {
+          // currentNode can be left as null if previous read had a checksum
+          // error on the same block. See HDFS-3067
+          if (pos > blockEnd || currentNodes == null) {
+            currentNodes = blockSeekTo(pos);
+          }
+          int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
+          synchronized(infoLock) {
+            if (locatedBlocks.isLastBlockComplete()) {
+              realLen = (int) Math.min(realLen,
+                  locatedBlocks.getFileLength() - pos);
+            }
+          }
+
+          /** Number of bytes already read into buffer */
+          int result = 0;
+          while (result < realLen) {
+            /**
+             * Temporary position into the file; {@link pos} might not proceed
+             * to this temporary position in case of exceptions.
+             */
+            long tmpPos = pos + result;
+            /** Start and end offsets of a cell in the file */
+            long cellStart = (tmpPos / cellSize) * cellSize;
+            long cellEnd = cellStart + cellSize - 1;
+
+            /** Number of bytes to read from the current cell */
+            int realLenInCell = (int) Math.min(realLen - result,
+                cellEnd - tmpPos + 1L);
+            assert realLenInCell > 0 : "Temporary position shouldn't be " +
+                "after cellEnd";
+            // Read from one blockReader up to cell boundary
+            int cellRet = readBuffer(blockReaders[idxInGroup],
+                currentNodes[idxInGroup], strategy, off + result,
+                realLenInCell);
+            if (cellRet >= 0) {
+              result += cellRet;
+              if (cellRet < realLenInCell) {
+                // A short read indicates the current blockReader buffer is
+                // already drained. Should return the read call. Otherwise
+                // should proceed to the next cell.
+                break;
+              }
+            } else {
+              // got a EOS from reader though we expect more data on it.
+              throw new IOException("Unexpected EOS from the reader");
+            }
+            idxInGroup = (idxInGroup + 1) % dataBlkNum;
+          }
+
+          pos += result;
+
+          if (dfsClient.stats != null) {
+            dfsClient.stats.incrementBytesRead(result);
+          }
+          return result;
+        } catch (ChecksumException ce) {
+          throw ce;
+        } catch (IOException e) {
+          if (retries == 1) {
+            DFSClient.LOG.warn("DFS Read", e);
+          }
+          blockEnd = -1;
+          if (currentNodes[idxInGroup] != null) {
+            addToDeadNodes(currentNodes[idxInGroup]);
+          }
+          if (--retries == 0) {
+            throw e;
+          }
+        } finally {
+          // Check if need to report block replicas corruption either read
+          // was successful or ChecksumException occured.
+          reportCheckSumFailure(corruptedBlockMap,
+              currentLocatedBlock.getLocations().length);
+        }
+      }
+    }
+    return -1;
+  }
+
+  private synchronized int readBuffer(BlockReader blockReader,
+      DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len)
+      throws IOException {
+    IOException ioe;
+    while (true) {
+      try {
+        return readerStrategy.doRead(blockReader, off, len);
+      } catch ( ChecksumException ce ) {
+        DFSClient.LOG.warn("Found Checksum error for "
+            + getCurrentBlock() + " from " + currentNode
+            + " at " + ce.getPos());
+        // If current block group is corrupt, it's meaningless to retry.
+        // TODO: this should trigger decoding logic (HDFS-7678)
+        throw ce;
+      } catch ( IOException e ) {
+        ioe = e;
+      }
+
+      boolean sourceFound = seekToBlockSource(pos);
+      if (!sourceFound) {
+        throw ioe;
+      }
+    }
+  }
+
+  private boolean seekToBlockSource(long targetPos)
       throws IOException {
-    throw new UnsupportedActionException("Stateful read is not supported");
+    currentNodes = blockSeekTo(targetPos);
+    return true;
+  }
+
+  protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy {
+    ByteBufferStrategy(ByteBuffer buf) {
+      super(buf);
+    }
+
+    @Override
+    public int doRead(BlockReader blockReader, int off, int len)
+        throws ChecksumException, IOException {
+      int oldlimit = buf.limit();
+      if (buf.remaining() > len) {
+        buf.limit(buf.position() + len);
+      }
+      int ret = super.doRead(blockReader, off, len);
+      buf.limit(oldlimit);
+      return ret;
+    }
   }
 
   /**
@@ -188,8 +464,11 @@ public class DFSStripedInputStream extends DFSInputStream {
         dataBlkNum, idx);
   }
 
-  private LocatedBlock getBlockGroupAt(long offset) throws IOException {
-    return super.getBlockAt(offset);
+  private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
+    LocatedBlock lb = super.getBlockAt(offset);
+    assert lb instanceof LocatedStripedBlock : "NameNode" +
+        " should return a LocatedStripedBlock for a striped file";
+    return (LocatedStripedBlock)lb;
   }
 
   /**
@@ -206,10 +485,8 @@ public class DFSStripedInputStream extends DFSInputStream {
     int len = (int) (end - start + 1);
 
     // Refresh the striped block group
-    LocatedBlock block = getBlockGroupAt(blockStartOffset);
-    assert block instanceof LocatedStripedBlock : "NameNode" +
-        " should return a LocatedStripedBlock for a striped file";
-    LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
+    LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);
+
 
     // Planning the portion of I/O for each shard
     ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start,
@@ -308,7 +585,7 @@ public class DFSStripedInputStream extends DFSInputStream {
      * +------------------------------------------------------+
      */
     private long startOffsetInBlock = 0;
-    private long readLength = 0;
+    private int readLength = 0;
     private final List<Integer> offsetsInBuf = new ArrayList<>();
     private final List<Integer> lengths = new ArrayList<>();
 
@@ -328,7 +605,7 @@ public class DFSStripedInputStream extends DFSInputStream {
       return lens;
     }
 
-    long getReadLength() {
+    int getReadLength() {
       return readLength;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8417a50/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 73c7350..cf10981 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -28,6 +28,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 public class TestDFSStripedInputStream {
   private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
@@ -165,6 +166,7 @@ public class TestDFSStripedInputStream {
     Assert.assertEquals("File length should be the same",
         writeBytes, fileLength);
 
+    // pread
     try (DFSStripedInputStream dis =
              new DFSStripedInputStream(fs.getClient(), src, true)) {
       byte[] buf = new byte[writeBytes + 100];
@@ -176,5 +178,46 @@ public class TestDFSStripedInputStream {
         Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
       }
     }
+
+    // stateful read with byte array
+    try (DFSStripedInputStream dis =
+             new DFSStripedInputStream(fs.getClient(), src, true)) {
+      byte[] buf = new byte[writeBytes + 100];
+      int readLen = 0;
+      int ret;
+      do {
+        ret = dis.read(buf, readLen, buf.length - readLen);
+        if (ret > 0) {
+          readLen += ret;
+        }
+      } while (ret >= 0);
+
+      readLen = readLen >= 0 ? readLen : 0;
+      Assert.assertEquals("The length of file should be the same to write size",
+          writeBytes, readLen);
+      for (int i = 0; i < writeBytes; i++) {
+        Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
+      }
+    }
+
+    // stateful read with ByteBuffer
+    try (DFSStripedInputStream dis =
+             new DFSStripedInputStream(fs.getClient(), src, true)) {
+      ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
+      int readLen = 0;
+      int ret;
+      do {
+        ret = dis.read(buf);
+        if (ret > 0) {
+          readLen += ret;
+        }
+      } while (ret >= 0);
+      readLen = readLen >= 0 ? readLen : 0;
+      Assert.assertEquals("The length of file should be the same to write size",
+          writeBytes, readLen);
+      for (int i = 0; i < writeBytes; i++) {
+        Assert.assertEquals("Byte at i should be the same", getByte(i), buf.array()[i]);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8417a50/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
index b0631ce..d980bd6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -28,7 +30,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -38,6 +39,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
@@ -52,19 +54,21 @@ public class TestReadStripedFile {
   private Path filePath = new Path(dirPath, "file");
   private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
   private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
-  private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
   private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
   private final int NUM_STRIPE_PER_BLOCK = 2;
-  private final int BLOCKSIZE = NUM_STRIPE_PER_BLOCK * DATA_BLK_NUM * CELLSIZE;
+  private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
+  private final int BLOCK_GROUP_SIZE =  DATA_BLK_NUM * INTERNAL_BLOCK_SIZE;
 
   @Before
   public void setup() throws IOException {
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
     SimulatedFSDataset.setFactory(conf);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(BLK_GROUP_SIZE)
-        .build();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+        DATA_BLK_NUM + PARITY_BLK_NUM).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
+    fs.mkdirs(dirPath);
+    fs.getClient().createErasureCodingZone(dirPath.toString(), null);
   }
 
   @After
@@ -80,10 +84,10 @@ public class TestReadStripedFile {
   @Test
   public void testGetBlock() throws Exception {
     final int numBlocks = 4;
-    DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks,
-        NUM_STRIPE_PER_BLOCK, true);
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        NUM_STRIPE_PER_BLOCK, false);
     LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
-        filePath.toString(), 0, BLOCKSIZE * numBlocks);
+        filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
     final DFSStripedInputStream in =
         new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
 
@@ -103,11 +107,11 @@ public class TestReadStripedFile {
 
   @Test
   public void testPread() throws Exception {
-    final int numBlocks = 4;
-    DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks,
-        NUM_STRIPE_PER_BLOCK, true);
+    final int numBlocks = 2;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        NUM_STRIPE_PER_BLOCK, false);
     LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
-        filePath.toString(), 0, BLOCKSIZE);
+        filePath.toString(), 0, BLOCK_GROUP_SIZE);
 
     assert lbs.get(0) instanceof LocatedStripedBlock;
     LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
@@ -121,11 +125,89 @@ public class TestReadStripedFile {
     }
     DFSStripedInputStream in =
         new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
-    int readSize = BLOCKSIZE;
+    int readSize = BLOCK_GROUP_SIZE;
     byte[] readBuffer = new byte[readSize];
     int ret = in.read(0, readBuffer, 0, readSize);
 
     assertEquals(readSize, ret);
     // TODO: verify read results with patterned data from HDFS-8117
   }
+
+  @Test
+  public void testStatefulRead() throws Exception {
+    testStatefulRead(false, false);
+    testStatefulRead(true, false);
+    testStatefulRead(true, true);
+  }
+
+  private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
+      throws Exception {
+    final int numBlocks = 2;
+    final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
+    if (cellMisalignPacket) {
+      conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1);
+      tearDown();
+      setup();
+    }
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, fileSize);
+
+    assert lbs.getLocatedBlocks().size() == numBlocks;
+    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      assert lb instanceof LocatedStripedBlock;
+      LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
+      for (int i = 0; i < DATA_BLK_NUM; i++) {
+        Block blk = new Block(bg.getBlock().getBlockId() + i,
+            NUM_STRIPE_PER_BLOCK * CELLSIZE,
+            bg.getBlock().getGenerationStamp());
+        blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+        cluster.injectBlocks(i, Arrays.asList(blk),
+            bg.getBlock().getBlockPoolId());
+      }
+    }
+
+    DFSStripedInputStream in =
+        new DFSStripedInputStream(fs.getClient(), filePath.toString(),
+            false);
+
+    byte[] expected = new byte[fileSize];
+
+    for (LocatedBlock bg : lbs.getLocatedBlocks()) {
+      /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+      for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+        for (int j = 0; j < DATA_BLK_NUM; j++) {
+          for (int k = 0; k < CELLSIZE; k++) {
+            int posInBlk = i * CELLSIZE + k;
+            int posInFile = (int) bg.getStartOffset() +
+                i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+            expected[posInFile] = SimulatedFSDataset.simulatedByte(
+                new Block(bg.getBlock().getBlockId() + j), posInBlk);
+          }
+        }
+      }
+    }
+
+    if (useByteBuffer) {
+      ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
+      int done = 0;
+      while (done < fileSize) {
+        int ret = in.read(readBuffer);
+        assertTrue(ret > 0);
+        done += ret;
+      }
+      assertArrayEquals(expected, readBuffer.array());
+    } else {
+      byte[] readBuffer = new byte[fileSize];
+      int done = 0;
+      while (done < fileSize) {
+        int ret = in.read(readBuffer, done, fileSize - done);
+        assertTrue(ret > 0);
+        done += ret;
+      }
+      assertArrayEquals(expected, readBuffer);
+    }
+    fs.delete(filePath, true);
+  }
 }