You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/18 19:16:48 UTC

[37/50] hadoop git commit: HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread). Contributed by Zhe Zhang.

HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread). Contributed by Zhe Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e83d1b8b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e83d1b8b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e83d1b8b

Branch: refs/heads/HDFS-7285
Commit: e83d1b8bb3a2cbb10b6cfa734e0d8f008f4c3aff
Parents: 206ad7f
Author: Zhe Zhang <zh...@apache.org>
Authored: Mon May 11 21:10:23 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 18 10:02:01 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../hadoop/hdfs/DFSStripedInputStream.java      | 164 ++++--
 .../erasurecode/ErasureCodingWorker.java        |  10 +-
 .../hadoop/hdfs/util/StripedBlockUtil.java      | 517 +++++++++++++++++--
 .../hadoop/hdfs/TestDFSStripedInputStream.java  |  97 +++-
 .../hadoop/hdfs/TestWriteReadStripedFile.java   |  49 ++
 6 files changed, 768 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e83d1b8b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index c7d01c7..0acf746 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -195,3 +195,6 @@
 
     HDFS-8355. Erasure Coding: Refactor BlockInfo and BlockInfoUnderConstruction.
     (Tsz Wo Nicholas Sze via jing9)
+
+    HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread).
+    (Zhe Zhang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e83d1b8b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 7425e75..7678fae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -21,15 +21,27 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.ByteBufferPool;
 
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 
 import org.apache.hadoop.io.erasurecode.ECSchema;
+
 import org.apache.hadoop.net.NetUtils;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -37,10 +49,12 @@ import org.apache.htrace.TraceScope;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 import java.util.Set;
+import java.util.Collection;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.concurrent.CompletionService;
@@ -51,7 +65,6 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 
-
 /******************************************************************************
  * DFSStripedInputStream reads from striped block groups, illustrated below:
  *
@@ -125,6 +138,7 @@ public class DFSStripedInputStream extends DFSInputStream {
   private final short parityBlkNum;
   /** the buffer for a complete stripe */
   private ByteBuffer curStripeBuf;
+  private final ECSchema schema;
   /**
    * indicate the start/end offset of the current buffered stripe in the
    * block group
@@ -137,6 +151,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     super(dfsClient, src, verifyChecksum);
 
     assert schema != null;
+    this.schema = schema;
     cellSize = schema.getChunkSize();
     dataBlkNum = (short) schema.getNumDataUnits();
     parityBlkNum = (short) schema.getNumParityUnits();
@@ -472,12 +487,10 @@ public class DFSStripedInputStream extends DFSInputStream {
    */
   @Override
   protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
-    LocatedBlock lb = super.getBlockAt(blkStartOffset);
-    assert lb instanceof LocatedStripedBlock : "NameNode should return a " +
-        "LocatedStripedBlock for a striped file";
+    LocatedBlock lb = getBlockGroupAt(blkStartOffset);
 
-    int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
-        % dataBlkNum);
+    int idx = (int) ((blkStartOffset - lb.getStartOffset())
+        % (dataBlkNum + parityBlkNum));
     // If indexing information is returned, iterate through the index array
     // to find the entry for position idx in the group
     LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
@@ -509,48 +522,121 @@ public class DFSStripedInputStream extends DFSInputStream {
       long end, byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
-    Map<Future<Void>, Integer> futures = new HashMap<>();
-    CompletionService<Void> stripedReadsService =
-        new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
-    int len = (int) (end - start + 1);
-
     // Refresh the striped block group
     LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);
 
+    AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, blockGroup,
+        start, end, buf, offset);
+    for (AlignedStripe stripe : stripes) {
+      fetchOneStripe(blockGroup, buf, stripe, corruptedBlockMap);
+    }
+  }
 
-    // Planning the portion of I/O for each shard
-    ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start,
-        len, offset);
-
+  private void fetchOneStripe(LocatedStripedBlock blockGroup,
+      byte[] buf, AlignedStripe alignedStripe, Map<ExtendedBlock,
+      Set<DatanodeInfo>> corruptedBlockMap) throws IOException {
+    Map<Future<Void>, Integer> futures = new HashMap<>();
+    CompletionService<Void> service =
+        new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
+    if (alignedStripe.getSpanInBlock() == 0) {
+      DFSClient.LOG.warn("Trying to read an empty stripe from" + blockGroup);
+      return;
+    }
     // Parse group to get chosen DN location
     LocatedBlock[] blks = StripedBlockUtil.
         parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum);
-
     for (short i = 0; i < dataBlkNum; i++) {
-      ReadPortion rp = readPortions[i];
-      if (rp.getReadLength() <= 0) {
-        continue;
+      if (alignedStripe.chunks[i] != null
+          && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+        fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i,
+            corruptedBlockMap);
       }
-      DatanodeInfo loc = blks[i].getLocations()[0];
-      StorageType type = blks[i].getStorageTypes()[0];
-      DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
-          loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
-          type);
-      Callable<Void> readCallable = getFromOneDataNode(dnAddr,
-          blks[i].getStartOffset(), rp.getStartOffsetInBlock(),
-          rp.getStartOffsetInBlock() + rp.getReadLength() - 1, buf,
-          rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
-      Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
-      DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
-      futures.put(getFromDNRequest, (int) i);
     }
+    // Input buffers for potential decode operation, which remains null until
+    // first read failure
+    byte[][] decodeInputs = null;
     while (!futures.isEmpty()) {
       try {
-        waitNextCompletion(stripedReadsService, futures);
+        StripingChunkReadResult r = getNextCompletedStripedRead(
+            service, futures, 0);
+        if (DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + alignedStripe);
+        }
+        StripingChunk returnedChunk = alignedStripe.chunks[r.index];
+        Preconditions.checkNotNull(returnedChunk);
+        Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
+        if (r.state == StripingChunkReadResult.SUCCESSFUL) {
+          returnedChunk.state = StripingChunk.FETCHED;
+          alignedStripe.fetchedChunksNum++;
+          if (alignedStripe.fetchedChunksNum == dataBlkNum) {
+            clearFutures(futures.keySet());
+            break;
+          }
+        } else {
+          returnedChunk.state = StripingChunk.MISSING;
+          alignedStripe.missingChunksNum++;
+          if (alignedStripe.missingChunksNum > parityBlkNum) {
+            clearFutures(futures.keySet());
+            throw new IOException("Too many blocks are missing: " + alignedStripe);
+          }
+          // When seeing first missing block, initialize decode input buffers
+          if (decodeInputs == null) {
+            decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum);
+          }
+          for (int i = 0; i < alignedStripe.chunks.length; i++) {
+            StripingChunk chunk = alignedStripe.chunks[i];
+            Preconditions.checkNotNull(chunk);
+            if (chunk.state == StripingChunk.REQUESTED && i <= dataBlkNum) {
+              fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i,
+                  corruptedBlockMap);
+            }
+          }
+        }
       } catch (InterruptedException ie) {
-        // Ignore and retry
+        String err = "Read request interrupted";
+        DFSClient.LOG.error(err);
+        clearFutures(futures.keySet());
+        // Don't decode if read interrupted
+        throw new InterruptedIOException(err);
       }
     }
+
+    if (alignedStripe.missingChunksNum > 0) {
+      decodeAndFillBuffer(decodeInputs, buf, alignedStripe,
+          dataBlkNum, parityBlkNum);
+    }
+  }
+
+  /**
+   * Schedule a single read request to an internal block
+   * @param block The internal block
+   * @param index Index of the internal block in the group
+   * @param corruptedBlockMap Map of corrupted blocks
+   */
+  private void fetchOneStripingChunk(Map<Future<Void>, Integer> futures,
+      final CompletionService<Void> service, final LocatedBlock block,
+      final AlignedStripe alignedStripe, final int index,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+    DatanodeInfo loc = block.getLocations()[0];
+    StorageType type = block.getStorageTypes()[0];
+    DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
+        loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
+        type);
+    StripingChunk chunk = alignedStripe.chunks[index];
+    chunk.state = StripingChunk.PENDING;
+    Callable<Void> readCallable = getFromOneDataNode(dnAddr,
+        block.getStartOffset(), alignedStripe.getOffsetInBlock(),
+        alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1, chunk.buf,
+        chunk.getOffsets(), chunk.getLengths(),
+        corruptedBlockMap, index);
+    Future<Void> getFromDNRequest = service.submit(readCallable);
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Submitting striped read request for " + index +
+          ". Info of the block: " + block + ", offset in block is " +
+          alignedStripe.getOffsetInBlock() + ", end is " +
+          (alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1));
+    }
+    futures.put(getFromDNRequest, index);
   }
 
   private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
@@ -609,4 +695,12 @@ public class DFSStripedInputStream extends DFSInputStream {
     throw new UnsupportedOperationException(
         "Not support enhanced byte buffer access.");
   }
+
+  /** A variation to {@link DFSInputStream#cancelAll} */
+  private void clearFutures(Collection<Future<Void>> futures) {
+    for (Future<Void> future : futures) {
+      future.cancel(false);
+    }
+    futures.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e83d1b8b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 5ede508..eedb191 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripedReadResult;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
@@ -462,10 +462,10 @@ public final class ErasureCodingWorker {
       int nsuccess = 0;
       while (!futures.isEmpty()) {
         try {
-          StripedReadResult result = 
+          StripingChunkReadResult result =
               StripedBlockUtil.getNextCompletedStripedRead(
                   readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
-          if (result.state == StripedReadResult.SUCCESSFUL) {
+          if (result.state == StripingChunkReadResult.SUCCESSFUL) {
             success[nsuccess++] = result.index;
             if (nsuccess >= dataBlkNum) {
               // cancel remaining reads if we read successfully from minimum
@@ -474,14 +474,14 @@ public final class ErasureCodingWorker {
               futures.clear();
               break;
             }
-          } else if (result.state == StripedReadResult.FAILED) {
+          } else if (result.state == StripingChunkReadResult.FAILED) {
             // If read failed for some source, we should not use it anymore 
             // and schedule read from a new source.
             StripedReader failedReader = stripedReaders.get(result.index);
             closeBlockReader(failedReader.blockReader);
             failedReader.blockReader = null;
             scheduleNewRead(used);
-          } else if (result.state == StripedReadResult.TIMEOUT) {
+          } else if (result.state == StripingChunkReadResult.TIMEOUT) {
             // If timeout, we also schedule a new read.
             scheduleNewRead(used);
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e83d1b8b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 45bbf6b..f7ae88a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -22,16 +22,18 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSStripedOutputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -85,7 +87,7 @@ public class StripedBlockUtil {
         new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
         new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
         new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
-        bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
+        bg.getStartOffset() + idxInBlockGroup, bg.isCorrupt(),
         null);
   }
 
@@ -238,33 +240,37 @@ public class StripedBlockUtil {
   /**
    * Get the next completed striped read task
    *
-   * @return {@link StripedReadResult} indicating the status of the read task
+   * @return {@link StripingChunkReadResult} indicating the status of the read task
    *          succeeded, and the block index of the task. If the method times
    *          out without getting any completed read tasks, -1 is returned as
    *          block index.
    * @throws InterruptedException
    */
-  public static StripedReadResult getNextCompletedStripedRead(
+  public static StripingChunkReadResult getNextCompletedStripedRead(
       CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
       final long threshold) throws InterruptedException {
     Preconditions.checkArgument(!futures.isEmpty());
-    Preconditions.checkArgument(threshold > 0);
     Future<Void> future = null;
     try {
-      future = readService.poll(threshold, TimeUnit.MILLISECONDS);
+      if (threshold > 0) {
+        future = readService.poll(threshold, TimeUnit.MILLISECONDS);
+      } else {
+        future = readService.take();
+      }
       if (future != null) {
         future.get();
-        return new StripedReadResult(futures.remove(future),
-            StripedReadResult.SUCCESSFUL);
+        return new StripingChunkReadResult(futures.remove(future),
+            StripingChunkReadResult.SUCCESSFUL);
       } else {
-        return new StripedReadResult(StripedReadResult.TIMEOUT);
+        return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
       }
     } catch (ExecutionException e) {
-      return new StripedReadResult(futures.remove(future),
-          StripedReadResult.FAILED);
+      DFSClient.LOG.error("ExecutionException " + e);
+      return new StripingChunkReadResult(futures.remove(future),
+          StripingChunkReadResult.FAILED);
     } catch (CancellationException e) {
-      return new StripedReadResult(futures.remove(future),
-          StripedReadResult.CANCELLED);
+      return new StripingChunkReadResult(futures.remove(future),
+          StripingChunkReadResult.CANCELLED);
     }
   }
 
@@ -291,26 +297,247 @@ public class StripedBlockUtil {
   }
 
   /**
-   * This class represents the portion of I/O associated with each block in the
-   * striped block group.
+   * Initialize the decoding input buffers based on the chunk states in an
+   * AlignedStripe
    */
-  public static class ReadPortion {
+  public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
+      int dataBlkNum, int parityBlkNum) {
+    byte[][] decodeInputs =
+        new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()];
+    for (int i = 0; i < alignedStripe.chunks.length; i++) {
+      StripingChunk chunk = alignedStripe.chunks[i];
+      if (chunk == null) {
+        alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
+        alignedStripe.chunks[i].offsetsInBuf.add(0);
+        alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock());
+      } else if (chunk.state == StripingChunk.FETCHED) {
+        int posInBuf = 0;
+        for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
+          System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j),
+              decodeInputs[i], posInBuf, chunk.lengthsInBuf.get(j));
+          posInBuf += chunk.lengthsInBuf.get(j);
+        }
+      } else if (chunk.state == StripingChunk.ALLZERO) {
+        Arrays.fill(decodeInputs[i], (byte)0);
+      }
+    }
+    return decodeInputs;
+  }
+
+  /**
+   * Decode based on the given input buffers and schema
+   */
+  public static void decodeAndFillBuffer(final byte[][] decodeInputs, byte[] buf,
+      AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum) {
+    int[] decodeIndices = new int[parityBlkNum];
+    int pos = 0;
+    for (int i = 0; i < alignedStripe.chunks.length; i++) {
+      if (alignedStripe.chunks[i].state != StripingChunk.FETCHED &&
+          alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+        decodeIndices[pos++] = i;
+      }
+    }
+
+    byte[][] outputs = new byte[parityBlkNum][(int) alignedStripe.getSpanInBlock()];
+    RSRawDecoder rsRawDecoder = new RSRawDecoder();
+    rsRawDecoder.initialize(dataBlkNum, parityBlkNum, (int) alignedStripe.getSpanInBlock());
+    rsRawDecoder.decode(decodeInputs, decodeIndices, outputs);
+
+    for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+      StripingChunk chunk = alignedStripe.chunks[i];
+      if (chunk.state == StripingChunk.MISSING) {
+        int srcPos = 0;
+        for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
+          //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
+//          System.arraycopy(outputs[i], srcPos, buf, chunk.offsetsInBuf.get(j),
+//              chunk.lengthsInBuf.get(j));
+          Arrays.fill(buf, chunk.offsetsInBuf.get(j),
+              chunk.offsetsInBuf.get(j) + chunk.lengthsInBuf.get(j), (byte)7);
+          srcPos += chunk.lengthsInBuf.get(j);
+        }
+      }
+    }
+  }
+
+  /**
+   * This method divides a requested byte range into an array of
+   * {@link AlignedStripe}
+   *
+   *
+   * At most 5 stripes will be generated from each logical range
+   * TODO: cleanup and get rid of planReadPortions
+   */
+  public static AlignedStripe[] divideByteRangeIntoStripes (
+      ECSchema ecSchema, LocatedStripedBlock blockGroup, long start, long end,
+      byte[] buf, int offsetInBuf) {
+    // TODO: change ECSchema naming to use cell size instead of chunk size
+
+    // Step 0: analyze range and calculate basic parameters
+    int cellSize = ecSchema.getChunkSize();
+    int dataBlkNum = ecSchema.getNumDataUnits();
+    int len = (int) (end - start + 1);
+    int firstCellIdxInBG = (int) (start / cellSize);
+    int lastCellIdxInBG = (int) (end / cellSize);
+    int firstCellSize = Math.min(cellSize - (int) (start % cellSize), len);
+    long firstCellOffsetInBlk = start % cellSize;
+    int lastCellSize = lastCellIdxInBG == firstCellIdxInBG ?
+        firstCellSize : (int) (end % cellSize) + 1;
+
+    // Step 1: get the unmerged ranges on each internal block
+    // TODO: StripingCell should carry info on size and start offset (HDFS-8320)
+    VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema,
+        firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk,
+        lastCellSize);
+
+    // Step 2: merge into at most 5 stripes
+    AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
+
+    // Step 3: calculate each chunk's position in destination buffer
+    calcualteChunkPositionsInBuf(ecSchema, blockGroup, buf, offsetInBuf,
+        firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk,
+        lastCellSize, stripes);
+
+    // Step 4: prepare ALLZERO blocks
+    prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum);
+
+    return stripes;
+  }
+
+  private static VerticalRange[] getRangesForInternalBlocks (ECSchema ecSchema,
+      int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize,
+      long firstCellOffsetInBlk, int lastCellSize) {
+    int cellSize = ecSchema.getChunkSize();
+    int dataBlkNum = ecSchema.getNumDataUnits();
+
+    StripingCell firstCell = new StripingCell(ecSchema, firstCellIdxInBG);
+    StripingCell lastCell = new StripingCell(ecSchema, lastCellIdxInBG);
+
+    VerticalRange ranges[] = new VerticalRange[dataBlkNum];
+    ranges[firstCell.idxInStripe] =
+        new VerticalRange(firstCellOffsetInBlk, firstCellSize);
+    for (int i = firstCellIdxInBG + 1; i < lastCellIdxInBG; i++) {
+      // iterate through all cells and update the list of StripeRanges
+      StripingCell cell = new StripingCell(ecSchema, i);
+      if (ranges[cell.idxInStripe] == null) {
+        ranges[cell.idxInStripe] = new VerticalRange(
+            cell.idxInInternalBlk * cellSize, cellSize);
+      } else {
+        ranges[cell.idxInStripe].spanInBlock += cellSize;
+      }
+    }
+    if (ranges[lastCell.idxInStripe] == null) {
+      ranges[lastCell.idxInStripe] = new VerticalRange(
+          lastCell.idxInInternalBlk * cellSize, lastCellSize);
+    } else if (lastCell.idxInBlkGroup != firstCell.idxInBlkGroup) {
+      ranges[lastCell.idxInStripe].spanInBlock += lastCellSize;
+    }
+
+    return ranges;
+  }
+
+  private static AlignedStripe[] mergeRangesForInternalBlocks(ECSchema ecSchema,
+      VerticalRange[] ranges) {
+    int dataBlkNum = ecSchema.getNumDataUnits();
+    int parityBlkNum = ecSchema.getNumParityUnits();
+    List<AlignedStripe> stripes = new ArrayList<>();
+    SortedSet<Long> stripePoints = new TreeSet<>();
+    for (VerticalRange r : ranges) {
+      if (r != null) {
+        stripePoints.add(r.offsetInBlock);
+        stripePoints.add(r.offsetInBlock + r.spanInBlock);
+      }
+    }
+
+    long prev = -1;
+    for (long point : stripePoints) {
+      if (prev >= 0) {
+        stripes.add(new AlignedStripe(prev, point - prev,
+            dataBlkNum + parityBlkNum));
+      }
+      prev = point;
+    }
+    return stripes.toArray(new AlignedStripe[stripes.size()]);
+  }
+
+  private static void calcualteChunkPositionsInBuf(ECSchema ecSchema,
+      LocatedStripedBlock blockGroup, byte[] buf, int offsetInBuf,
+      int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize,
+      long firstCellOffsetInBlk, int lastCellSize, AlignedStripe[] stripes) {
+    int cellSize = ecSchema.getChunkSize();
+    int dataBlkNum = ecSchema.getNumDataUnits();
+    // Step 3: calculate each chunk's position in destination buffer
     /**
-     * startOffsetInBlock
-     *     |
-     *     v
-     *     |<-lengths[0]->|<-  lengths[1]  ->|<-lengths[2]->|
+     *     | <--------------- AlignedStripe --------------->|
+     *
+     *     |<- length_0 ->|<--  length_1  -->|<- length_2 ->|
      * +------------------+------------------+----------------+
-     * |      cell_0      |      cell_3      |     cell_6     |  <- blk_0
+     * |    cell_0_0_0    |    cell_3_1_0    |   cell_6_2_0   |  <- blk_0
      * +------------------+------------------+----------------+
      *   _/                \_______________________
      *  |                                          |
-     *  v offsetsInBuf[0]                          v offsetsInBuf[1]
-     * +------------------------------------------------------+
-     * |  cell_0     |      cell_1 and cell_2      |cell_3 ...|   <- buf
-     * |  (partial)  |    (from blk_1 and blk_2)   |          |
-     * +------------------------------------------------------+
+     *  v offset_0                                 v offset_1
+     * +----------------------------------------------------------+
+     * |  cell_0_0_0 |  cell_1_0_1 and cell_2_0_2  |cell_3_1_0 ...|   <- buf
+     * |  (partial)  |    (from blk_1 and blk_2)   |              |
+     * +----------------------------------------------------------+
+     *
+     * Cell indexing convention defined in {@link StripingCell}
      */
+    int done = 0;
+    for (int i = firstCellIdxInBG; i <= lastCellIdxInBG; i++) {
+      StripingCell cell  = new StripingCell(ecSchema, i);
+      long cellStart = i == firstCellIdxInBG ?
+          firstCellOffsetInBlk : cell.idxInInternalBlk * cellSize;
+      int cellLen;
+      if (i == firstCellIdxInBG) {
+        cellLen = firstCellSize;
+      } else if (i == lastCellIdxInBG) {
+        cellLen = lastCellSize;
+      } else {
+        cellLen = cellSize;
+      }
+      long cellEnd = cellStart + cellLen - 1;
+      for (AlignedStripe s : stripes) {
+        long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
+        long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
+        long overlapEnd = Math.min(cellEnd, stripeEnd);
+        int overLapLen = (int) (overlapEnd - overlapStart + 1);
+        if (overLapLen <= 0) {
+          continue;
+        }
+        if (s.chunks[cell.idxInStripe] == null) {
+          s.chunks[cell.idxInStripe] = new StripingChunk(buf);
+        }
+
+        s.chunks[cell.idxInStripe].offsetsInBuf.
+            add((int)(offsetInBuf + done + overlapStart - cellStart));
+        s.chunks[cell.idxInStripe].lengthsInBuf.add(overLapLen);
+      }
+      done += cellLen;
+    }
+  }
+
+  private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup,
+      byte[] buf, AlignedStripe[] stripes, int cellSize, int dataBlkNum) {
+    for (AlignedStripe s : stripes) {
+      for (int i = 0; i < dataBlkNum; i++) {
+        long internalBlkLen = getInternalBlockLength(blockGroup.getBlockSize(),
+            cellSize, dataBlkNum, i);
+        if (internalBlkLen <= s.getOffsetInBlock()) {
+          Preconditions.checkState(s.chunks[i] == null);
+          s.chunks[i] = new StripingChunk(buf);
+          s.chunks[i].state = StripingChunk.ALLZERO;
+        }
+      }
+    }
+  }
+
+  /**
+   * This class represents the portion of I/O associated with each block in the
+   * striped block group.
+   * TODO: consolidate ReadPortion with AlignedStripe
+   */
+  public static class ReadPortion {
     private long startOffsetInBlock = 0;
     private int readLength = 0;
     public final List<Integer> offsetsInBuf = new ArrayList<>();
@@ -350,11 +577,234 @@ public class StripedBlockUtil {
   }
 
   /**
+   * The unit of encoding used in {@link DFSStripedOutputStream}
+   *  | <------- Striped Block Group -------> |
+   *    blk_0          blk_1          blk_2
+   *      |              |              |
+   *      v              v              v
+   * +----------+   +----------+   +----------+
+   * |cell_0_0_0|   |cell_1_0_1|   |cell_2_0_2|
+   * +----------+   +----------+   +----------+
+   * |cell_3_1_0|   |cell_4_1_1|   |cell_5_1_2| <- {@link idxInBlkGroup} = 5
+   * +----------+   +----------+   +----------+    {@link idxInInternalBlk} = 1
+   *                                               {@link idxInStripe} = 2
+   * A StripingCell is a special instance of {@link StripingChunk} whose offset
+   * and size align with the cell used when writing data.
+   * TODO: consider parity cells
+   */
+  public static class StripingCell {
+    public final ECSchema schema;
+    /** Logical order in a block group, used when doing I/O to a block group */
+    public final int idxInBlkGroup;
+    public final int idxInInternalBlk;
+    public final int idxInStripe;
+
+    public StripingCell(ECSchema ecSchema, int idxInBlkGroup) {
+      this.schema = ecSchema;
+      this.idxInBlkGroup = idxInBlkGroup;
+      this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits();
+      this.idxInStripe = idxInBlkGroup -
+          this.idxInInternalBlk * ecSchema.getNumDataUnits();
+    }
+
+    public StripingCell(ECSchema ecSchema, int idxInInternalBlk,
+        int idxInStripe) {
+      this.schema = ecSchema;
+      this.idxInInternalBlk = idxInInternalBlk;
+      this.idxInStripe = idxInStripe;
+      this.idxInBlkGroup =
+          idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe;
+    }
+  }
+
+  /**
+   * Given a requested byte range on a striped block group, an AlignedStripe
+   * represents a {@link VerticalRange} that is aligned with both the byte range
+   * and boundaries of all internal blocks. As illustrated in the diagram, any
+   * given byte range on a block group leads to 1~5 AlignedStripe's.
+   *
+   * |<-------- Striped Block Group -------->|
+   * blk_0   blk_1   blk_2      blk_3   blk_4
+   *                 +----+  |  +----+  +----+
+   *                 |full|  |  |    |  |    | <- AlignedStripe0:
+   *         +----+  |~~~~|  |  |~~~~|  |~~~~|      1st cell is partial
+   *         |part|  |    |  |  |    |  |    | <- AlignedStripe1: byte range
+   * +----+  +----+  +----+  |  |~~~~|  |~~~~|      doesn't start at 1st block
+   * |full|  |full|  |full|  |  |    |  |    |
+   * |cell|  |cell|  |cell|  |  |    |  |    | <- AlignedStripe2 (full stripe)
+   * |    |  |    |  |    |  |  |    |  |    |
+   * +----+  +----+  +----+  |  |~~~~|  |~~~~|
+   * |full|  |part|          |  |    |  |    | <- AlignedStripe3: byte range
+   * |~~~~|  +----+          |  |~~~~|  |~~~~|      doesn't end at last block
+   * |    |                  |  |    |  |    | <- AlignedStripe4:
+   * +----+                  |  +----+  +----+      last cell is partial
+   *                         |
+   * <---- data blocks ----> | <--- parity --->
+   *
+   * An AlignedStripe is the basic unit of reading from a striped block group,
+   * because within the AlignedStripe, all internal blocks can be processed in
+   * a uniform manner.
+   *
+   * The coverage of an AlignedStripe on an internal block is represented as a
+   * {@link StripingChunk}.
+   * To simplify the logic of reading a logical byte range from a block group,
+   * a StripingChunk is either completely in the requested byte range or
+   * completely outside the requested byte range.
+   */
+  public static class AlignedStripe {
+    public VerticalRange range;
+    /** status of each chunk in the stripe */
+    public final StripingChunk[] chunks;
+    public int fetchedChunksNum = 0;
+    public int missingChunksNum = 0;
+
+    public AlignedStripe(long offsetInBlock, long length, int width) {
+      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+      this.range = new VerticalRange(offsetInBlock, length);
+      this.chunks = new StripingChunk[width];
+    }
+
+    public AlignedStripe(VerticalRange range, int width) {
+      this.range = range;
+      this.chunks = new StripingChunk[width];
+    }
+
+    public boolean include(long pos) {
+      return range.include(pos);
+    }
+
+    public long getOffsetInBlock() {
+      return range.offsetInBlock;
+    }
+
+    public long getSpanInBlock() {
+      return range.spanInBlock;
+    }
+
+    @Override
+    public String toString() {
+      return "Offset=" + range.offsetInBlock + ", length=" + range.spanInBlock +
+          ", fetchedChunksNum=" + fetchedChunksNum +
+          ", missingChunksNum=" + missingChunksNum;
+    }
+  }
+
+  /**
+   * A simple utility class representing an arbitrary vertical inclusive range
+   * starting at {@link offsetInBlock} and lasting for {@link length} bytes in
+   * an internal block. Note that VerticalRange doesn't necessarily align with
+   * {@link StripingCell}.
+   *
+   * |<- Striped Block Group ->|
+   *  blk_0
+   *    |
+   *    v
+   * +-----+
+   * |~~~~~| <-- {@link offsetInBlock}
+   * |     |  ^
+   * |     |  |
+   * |     |  | {@link spanInBlock}
+   * |     |  v
+   * |~~~~~| ---
+   * |     |
+   * +-----+
+   */
+  public static class VerticalRange {
+    /** start offset in the block group (inclusive) */
+    public long offsetInBlock;
+    /** length of the stripe range */
+    public long spanInBlock;
+
+    public VerticalRange(long offsetInBlock, long length) {
+      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+      this.offsetInBlock = offsetInBlock;
+      this.spanInBlock = length;
+    }
+
+    /** whether a position is in the range */
+    public boolean include(long pos) {
+      return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock;
+    }
+  }
+
+  /**
+   * Indicates the coverage of an {@link AlignedStripe} on an internal block,
+   * and the state of the chunk in the context of the read request.
+   *
+   * |<---------------- Striped Block Group --------------->|
+   *   blk_0        blk_1        blk_2          blk_3   blk_4
+   *                           +---------+  |  +----+  +----+
+   *     null         null     |REQUESTED|  |  |null|  |null| <- AlignedStripe0
+   *              +---------+  |---------|  |  |----|  |----|
+   *     null     |REQUESTED|  |REQUESTED|  |  |null|  |null| <- AlignedStripe1
+   * +---------+  +---------+  +---------+  |  +----+  +----+
+   * |REQUESTED|  |REQUESTED|    ALLZERO    |  |null|  |null| <- AlignedStripe2
+   * +---------+  +---------+               |  +----+  +----+
+   * <----------- data blocks ------------> | <--- parity --->
+   *
+   * The class also carries {@link buf}, {@link offsetsInBuf}, and
+   * {@link lengthsInBuf} to define how read task for this chunk should deliver
+   * the returned data.
+   */
+  public static class StripingChunk {
+    /** Chunk has been successfully fetched */
+    public static final int FETCHED = 0x01;
+    /** Chunk has encountered failed when being fetched */
+    public static final int MISSING = 0x02;
+    /** Chunk being fetched (fetching task is in-flight) */
+    public static final int PENDING = 0x04;
+    /**
+     * Chunk is requested either by application or for decoding, need to
+     * schedule read task
+     */
+    public static final int REQUESTED = 0X08;
+    /**
+     * Internal block is short and has no overlap with chunk. Chunk considered
+     * all-zero bytes in codec calculations.
+     */
+    public static final int ALLZERO = 0X0f;
+
+    /**
+     * If a chunk is completely in requested range, the state transition is:
+     * REQUESTED (when AlignedStripe created) -> PENDING -> {FETCHED | MISSING}
+     * If a chunk is completely outside requested range (including parity
+     * chunks), state transition is:
+     * null (AlignedStripe created) -> REQUESTED (upon failure) -> PENDING ...
+     */
+    public int state = REQUESTED;
+    public byte[] buf;
+    public List<Integer> offsetsInBuf;
+    public List<Integer> lengthsInBuf;
+
+    public StripingChunk(byte[] buf) {
+      this.buf = buf;
+      this.offsetsInBuf = new ArrayList<>();
+      this.lengthsInBuf = new ArrayList<>();
+    }
+
+    public int[] getOffsets() {
+      int[] offsets = new int[offsetsInBuf.size()];
+      for (int i = 0; i < offsets.length; i++) {
+        offsets[i] = offsetsInBuf.get(i);
+      }
+      return offsets;
+    }
+
+    public int[] getLengths() {
+      int[] lens = new int[this.lengthsInBuf.size()];
+      for (int i = 0; i < lens.length; i++) {
+        lens[i] = this.lengthsInBuf.get(i);
+      }
+      return lens;
+    }
+  }
+
+  /**
    * This class represents result from a striped read request.
    * If the task was successful or the internal computation failed,
    * an index is also returned.
    */
-  public static class StripedReadResult {
+  public static class StripingChunkReadResult {
     public static final int SUCCESSFUL = 0x01;
     public static final int FAILED = 0x02;
     public static final int TIMEOUT = 0x04;
@@ -363,18 +813,23 @@ public class StripedBlockUtil {
     public final int index;
     public final int state;
 
-    public StripedReadResult(int state) {
+    public StripingChunkReadResult(int state) {
       Preconditions.checkArgument(state == TIMEOUT,
           "Only timeout result should return negative index.");
       this.index = -1;
       this.state = state;
     }
 
-    public StripedReadResult(int index, int state) {
+    public StripingChunkReadResult(int index, int state) {
       Preconditions.checkArgument(state != TIMEOUT,
           "Timeout result should return negative index.");
       this.index = index;
       this.state = state;
     }
+
+    @Override
+    public String toString() {
+      return "(index=" + index + ", state =" + state + ")";
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e83d1b8b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 3f79933..452cc2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -133,8 +134,102 @@ public class TestDFSStripedInputStream {
     byte[] readBuffer = new byte[readSize];
     int ret = in.read(0, readBuffer, 0, readSize);
 
+    byte[] expected = new byte[readSize];
+    /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+    for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+      for (int j = 0; j < DATA_BLK_NUM; j++) {
+        for (int k = 0; k < CELLSIZE; k++) {
+          int posInBlk = i * CELLSIZE + k;
+          int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+          expected[posInFile] = SimulatedFSDataset.simulatedByte(
+              new Block(bg.getBlock().getBlockId() + j), posInBlk);
+        }
+      }
+    }
+
     assertEquals(readSize, ret);
-    // TODO: verify read results with patterned data from HDFS-8117
+    assertArrayEquals(expected, readBuffer);
+  }
+
+  @Test
+  public void testPreadWithDNFailure() throws Exception {
+    final int numBlocks = 4;
+    final int failedDNIdx = 2;
+    DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+        NUM_STRIPE_PER_BLOCK, false);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCK_GROUP_SIZE);
+
+    assert lbs.get(0) instanceof LocatedStripedBlock;
+    LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
+    for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) {
+      Block blk = new Block(bg.getBlock().getBlockId() + i,
+          NUM_STRIPE_PER_BLOCK * CELLSIZE,
+          bg.getBlock().getGenerationStamp());
+      blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+      cluster.injectBlocks(i, Arrays.asList(blk),
+          bg.getBlock().getBlockPoolId());
+    }
+    DFSStripedInputStream in =
+        new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
+            ErasureCodingSchemaManager.getSystemDefaultSchema());
+    int readSize = BLOCK_GROUP_SIZE;
+    byte[] readBuffer = new byte[readSize];
+    byte[] expected = new byte[readSize];
+    cluster.stopDataNode(failedDNIdx);
+    /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+    for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+      for (int j = 0; j < DATA_BLK_NUM; j++) {
+        for (int k = 0; k < CELLSIZE; k++) {
+          int posInBlk = i * CELLSIZE + k;
+          int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+          expected[posInFile] = SimulatedFSDataset.simulatedByte(
+              new Block(bg.getBlock().getBlockId() + j), posInBlk);
+        }
+      }
+    }
+
+    // Update the expected content for decoded data
+    for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+      byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE];
+      int[] missingBlkIdx = new int[]{failedDNIdx, DATA_BLK_NUM+1, DATA_BLK_NUM+2};
+      byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE];
+      for (int j = 0; j < DATA_BLK_NUM; j++) {
+        int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
+        if (j != failedDNIdx) {
+          System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE);
+        }
+      }
+      for (int k = 0; k < CELLSIZE; k++) {
+        int posInBlk = i * CELLSIZE + k;
+        decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte(
+            new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk);
+      }
+//      RSRawDecoder rsRawDecoder = new RSRawDecoder();
+//      rsRawDecoder.initialize(DATA_BLK_NUM, PARITY_BLK_NUM, CELLSIZE);
+//      rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
+      int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
+//      System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
+      //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
+      Arrays.fill(expected, posInBuf, posInBuf + CELLSIZE, (byte)7);
+    }
+    int delta = 10;
+    int done = 0;
+    // read a small delta, shouldn't trigger decode
+    // |cell_0 |
+    // |10     |
+    done += in.read(0, readBuffer, 0, delta);
+    assertEquals(delta, done);
+    // both head and trail cells are partial
+    // |c_0      |c_1    |c_2 |c_3 |c_4      |c_5         |
+    // |256K - 10|missing|256K|256K|256K - 10|not in range|
+    done += in.read(delta, readBuffer, delta,
+        CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
+    assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
+    // read the rest
+    done += in.read(done, readBuffer, done, readSize - done);
+    assertEquals(readSize, done);
+    assertArrayEquals(expected, readBuffer);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e83d1b8b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
index 5c6f449..57d6eb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -18,10 +18,13 @@
 package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -321,4 +324,50 @@ public class TestWriteReadStripedFile {
       Assert.assertArrayEquals(bytes, result.array());
     }
   }
+
+  @Test
+  public void testWritePreadWithDNFailure() throws IOException {
+    final int failedDNIdx = 2;
+    final int length = cellSize * (dataBlocks + 2);
+    Path testPath = new Path("/foo");
+    final byte[] bytes = generateBytes(length);
+    DFSTestUtil.writeFile(fs, testPath, new String(bytes));
+
+    // shut down the DN that holds the last internal data block
+    BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
+        cellSize);
+    String name = (locs[0].getNames())[failedDNIdx];
+    for (DataNode dn : cluster.getDataNodes()) {
+      int port = dn.getXferPort();
+      if (name.contains(Integer.toString(port))) {
+        dn.shutdown();
+        break;
+      }
+    }
+
+    // pread
+    int startOffsetInFile = cellSize * 5;
+    try (FSDataInputStream fsdis = fs.open(testPath)) {
+      byte[] buf = new byte[length];
+      int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length);
+      Assert.assertEquals("The length of file should be the same to write size",
+          length - startOffsetInFile, readLen);
+
+      RSRawDecoder rsRawDecoder = new RSRawDecoder();
+      rsRawDecoder.initialize(dataBlocks, parityBlocks, 1);
+      byte[] expected = new byte[readLen];
+      for (int i = startOffsetInFile; i < length; i++) {
+        //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
+        if ((i / cellSize) % dataBlocks == failedDNIdx) {
+          expected[i - startOffsetInFile] = (byte)7;
+        } else {
+          expected[i - startOffsetInFile] = getByte(i);
+        }
+      }
+      for (int i = startOffsetInFile; i < length; i++) {
+        Assert.assertEquals("Byte at " + i + " should be the same",
+            expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
+      }
+    }
+  }
 }