You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/19 07:55:45 UTC

hadoop git commit: HDFS-8320. Erasure coding: consolidate striping-related terminologies. Contributed by Zhe Zhang and Jing Zhao.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 d879a4771 -> 12d030bef


HDFS-8320. Erasure coding: consolidate striping-related terminologies. Contributed by Zhe Zhang and Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/12d030be
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/12d030be
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/12d030be

Branch: refs/heads/HDFS-7285
Commit: 12d030beff2019c090fddbebeae3d4127f0c0409
Parents: d879a47
Author: Zhe Zhang <zh...@apache.org>
Authored: Mon May 18 22:55:27 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 18 22:55:27 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   2 +
 .../hadoop/hdfs/DFSStripedInputStream.java      |  46 +-
 .../erasurecode/ErasureCodingWorker.java        |   4 +-
 .../hadoop/hdfs/util/StripedBlockUtil.java      | 419 +++++++++----------
 .../hadoop/hdfs/TestPlanReadPortions.java       | 143 -------
 .../hadoop/hdfs/TestRecoverStripedFile.java     |   4 +-
 .../hadoop/hdfs/util/TestStripedBlockUtil.java  | 196 ++++++++-
 7 files changed, 400 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/12d030be/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 1549930..3170e9b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -223,3 +223,5 @@
 
     HDFS-8418. Fix the isNeededReplication calculation for Striped block in NN.
     (Yi Liu via jing9)
+
+    HDFS-8320. Erasure coding: consolidate striping-related terminologies. (zhz)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/12d030be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 8f15eda..744d586 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -23,19 +23,18 @@ import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.ByteBufferPool;
 
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getStartOffsetsForInternalBlocks;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
@@ -65,30 +64,9 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 
-/******************************************************************************
- * DFSStripedInputStream reads from striped block groups, illustrated below:
- *
- * | <- Striped Block Group -> |
- *  blk_0      blk_1       blk_2   <- A striped block group has
- *    |          |           |          {@link #dataBlkNum} blocks
- *    v          v           v
- * +------+   +------+   +------+
- * |cell_0|   |cell_1|   |cell_2|  <- The logical read order should be
- * +------+   +------+   +------+       cell_0, cell_1, ...
- * |cell_3|   |cell_4|   |cell_5|
- * +------+   +------+   +------+
- * |cell_6|   |cell_7|   |cell_8|
- * +------+   +------+   +------+
- * |cell_9|
- * +------+  <- A cell contains {@link #cellSize} bytes of data
- *
- * Three styles of read will eventually be supported:
- *   1. Stateful read
- *   2. pread without decode support
- *     This is implemented by calculating the portion of read from each block and
- *     issuing requests to each DataNode in parallel.
- *   3. pread with decode support: TODO: will be supported after HDFS-7678
- *****************************************************************************/
+/**
+ * DFSStripedInputStream reads from striped block groups
+ */
 public class DFSStripedInputStream extends DFSInputStream {
 
   private static class ReaderRetryPolicy {
@@ -207,22 +185,24 @@ public class DFSStripedInputStream extends DFSInputStream {
     currentLocatedBlock = targetBlockGroup;
 
     final long offsetIntoBlockGroup = getOffsetInBlockGroup();
-    LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
+    LocatedBlock[] targetBlocks = parseStripedBlockGroup(
         targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
-    // The purpose is to get start offset into each block
-    ReadPortion[] readPortions = planReadPortions(groupSize, cellSize,
-        offsetIntoBlockGroup, 0, 0);
+    // The purpose is to get start offset into each block.
+    long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema,
+        targetBlockGroup, offsetIntoBlockGroup);
+    Preconditions.checkNotNull(offsetsForInternalBlocks);
 
     final ReaderRetryPolicy retry = new ReaderRetryPolicy();
     for (int i = 0; i < groupSize; i++) {
       LocatedBlock targetBlock = targetBlocks[i];
       if (targetBlock != null) {
+        long offsetInBlock = offsetsForInternalBlocks[i] < 0 ?
+            0 : offsetsForInternalBlocks[i];
         DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
         if (retval != null) {
           currentNodes[i] = retval.info;
           blockReaders[i] = getBlockReaderWithRetry(targetBlock,
-              readPortions[i].getStartOffsetInBlock(),
-              targetBlock.getBlockSize() - readPortions[i].getStartOffsetInBlock(),
+              offsetInBlock, targetBlock.getBlockSize() - offsetInBlock,
               retval.addr, retval.storageType, retval.info, target, retry);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/12d030be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 7b3c24d..a1c0f72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -301,12 +301,12 @@ public final class ErasureCodingWorker {
     }
 
     private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
-      return StripedBlockUtil.constructStripedBlock(blockGroup, cellSize,
+      return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize,
           dataBlkNum, i);
     }
 
     private long getBlockLen(ExtendedBlock blockGroup, int i) { 
-      return StripedBlockUtil.getStripedBlockLength(blockGroup.getNumBytes(),
+      return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(),
           cellSize, dataBlkNum, i);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/12d030be/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 81c0c95..2fa3fdf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -41,7 +41,28 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Utility class for analyzing striped block groups
+ * When accessing a file in striped layout, operations on logical byte ranges
+ * in the file need to be mapped to physical byte ranges on block files stored
+ * on DataNodes. This utility class facilities this mapping by defining and
+ * exposing a number of striping-related concepts. The most basic ones are
+ * illustrated in the following diagram. Unless otherwise specified, all
+ * range-related calculations are inclusive (the end offset of the previous
+ * range should be 1 byte lower than the start offset of the next one).
+ *
+ *  | <----  Block Group ----> |   <- Block Group: logical unit composing
+ *  |                          |        striped HDFS files.
+ *  blk_0      blk_1       blk_2   <- Internal Blocks: each internal block
+ *    |          |           |          represents a physically stored local
+ *    v          v           v          block file
+ * +------+   +------+   +------+
+ * |cell_0|   |cell_1|   |cell_2|  <- {@link StripingCell} represents the
+ * +------+   +------+   +------+       logical order that a Block Group should
+ * |cell_3|   |cell_4|   |cell_5|       be accessed: cell_0, cell_1, ...
+ * +------+   +------+   +------+
+ * |cell_6|   |cell_7|   |cell_8|
+ * +------+   +------+   +------+
+ * |cell_9|
+ * +------+  <- A cell contains cellSize bytes of data
  */
 @InterfaceAudience.Private
 public class StripedBlockUtil {
@@ -103,31 +124,6 @@ public class StripedBlockUtil {
         cellSize, dataBlkNum, idxInBlockGroup));
     return block;
   }
-  
-  /**
-   * This method creates an internal {@link ExtendedBlock} at the given index
-   * of a block group, for both data and parity block.
-   */
-  public static ExtendedBlock constructStripedBlock(ExtendedBlock blockGroup,
-      int cellSize, int dataBlkNum, int idxInBlockGroup) {
-    ExtendedBlock block = new ExtendedBlock(blockGroup);
-    block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup);
-    block.setNumBytes(getStripedBlockLength(blockGroup.getNumBytes(), cellSize,
-        dataBlkNum, idxInBlockGroup));
-    return block;
-  }
-
-  /**
-   * Returns an internal block length at the given index of a block group,
-   * for both data and parity block.
-   */
-  public static long getStripedBlockLength(long numBytes, int cellSize,
-      int dataBlkNum, int idxInBlockGroup) {
-    // parity block length is the same as the first striped block length. 
-    return StripedBlockUtil.getInternalBlockLength(
-        numBytes, cellSize, dataBlkNum, 
-        idxInBlockGroup < dataBlkNum ? idxInBlockGroup : 0);
-  }
 
   /**
    * Get the size of an internal block at the given index of a block group
@@ -157,7 +153,7 @@ public class StripedBlockUtil {
     return (numStripes - 1L)*cellSize
         + lastCellSize(lastStripeDataLen, cellSize, numDataBlocks, i);
   }
-  
+
   private static int lastCellSize(int size, int cellSize, int numDataBlocks,
       int i) {
     if (i < numDataBlocks) {
@@ -184,60 +180,6 @@ public class StripedBlockUtil {
   }
 
   /**
-   * This method plans the read portion from each block in the stripe
-   * @param dataBlkNum The number of data blocks in the striping group
-   * @param cellSize The size of each striping cell
-   * @param startInBlk Starting offset in the striped block
-   * @param len Length of the read request
-   * @param bufOffset  Initial offset in the result buffer
-   * @return array of {@link ReadPortion}, each representing the portion of I/O
-   *         for an individual block in the group
-   */
-  @VisibleForTesting
-  public static ReadPortion[] planReadPortions(final int dataBlkNum,
-      final int cellSize, final long startInBlk, final int len, int bufOffset) {
-    ReadPortion[] results = new ReadPortion[dataBlkNum];
-    for (int i = 0; i < dataBlkNum; i++) {
-      results[i] = new ReadPortion();
-    }
-
-    // cellIdxInBlk is the index of the cell in the block
-    // E.g., cell_3 is the 2nd cell in blk_0
-    int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));
-
-    // blkIdxInGroup is the index of the block in the striped block group
-    // E.g., blk_2 is the 3rd block in the group
-    final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
-    results[blkIdxInGroup].setStartOffsetInBlock(cellSize * cellIdxInBlk +
-        startInBlk % cellSize);
-    boolean crossStripe = false;
-    for (int i = 1; i < dataBlkNum; i++) {
-      if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
-        cellIdxInBlk++;
-        crossStripe = true;
-      }
-      results[(blkIdxInGroup + i) % dataBlkNum].setStartOffsetInBlock(
-          cellSize * cellIdxInBlk);
-    }
-
-    int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
-    results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
-    results[blkIdxInGroup].lengths.add(firstCellLen);
-    results[blkIdxInGroup].addReadLength(firstCellLen);
-
-    int i = (blkIdxInGroup + 1) % dataBlkNum;
-    for (int done = firstCellLen; done < len; done += cellSize) {
-      ReadPortion rp = results[i];
-      rp.offsetsInBuf.add(done + bufOffset);
-      final int readLen = Math.min(len - done, cellSize);
-      rp.lengths.add(readLen);
-      rp.addReadLength(readLen);
-      i = (i + 1) % dataBlkNum;
-    }
-    return results;
-  }
-
-  /**
    * Get the next completed striped read task
    *
    * @return {@link StripingChunkReadResult} indicating the status of the read task
@@ -360,84 +302,167 @@ public class StripedBlockUtil {
   }
 
   /**
-   * This method divides a requested byte range into an array of
-   * {@link AlignedStripe}
-   *
+   * This method divides a requested byte range into an array of inclusive
+   * {@link AlignedStripe}.
+   * @param ecSchema The codec schema for the file, which carries the numbers
+   *                 of data / parity blocks, as well as cell size
+   * @param blockGroup The striped block group
+   * @param rangeStartInBlockGroup The byte range's start offset in block group
+   * @param rangeEndInBlockGroup The byte range's end offset in block group
+   * @param buf Destination buffer of the read operation for the byte range
+   * @param offsetInBuf Start offset into the destination buffer
    *
-   * At most 5 stripes will be generated from each logical range
-   * TODO: cleanup and get rid of planReadPortions
+   * At most 5 stripes will be generated from each logical range, as
+   * demonstrated in the header of {@link AlignedStripe}.
    */
   public static AlignedStripe[] divideByteRangeIntoStripes (
-      ECSchema ecSchema, LocatedStripedBlock blockGroup, long start, long end,
-      byte[] buf, int offsetInBuf) {
+      ECSchema ecSchema, LocatedStripedBlock blockGroup,
+      long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
+      int offsetInBuf) {
     // TODO: change ECSchema naming to use cell size instead of chunk size
 
     // Step 0: analyze range and calculate basic parameters
     int cellSize = ecSchema.getChunkSize();
     int dataBlkNum = ecSchema.getNumDataUnits();
-    int len = (int) (end - start + 1);
-    int firstCellIdxInBG = (int) (start / cellSize);
-    int lastCellIdxInBG = (int) (end / cellSize);
-    int firstCellSize = Math.min(cellSize - (int) (start % cellSize), len);
-    long firstCellOffsetInBlk = firstCellIdxInBG / dataBlkNum * cellSize +
-        start % cellSize;
-    int lastCellSize = lastCellIdxInBG == firstCellIdxInBG ?
-        firstCellSize : (int) (end % cellSize) + 1;
-
-    // Step 1: get the unmerged ranges on each internal block
-    // TODO: StripingCell should carry info on size and start offset (HDFS-8320)
-    VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema,
-        firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk,
-        lastCellSize);
-
-    // Step 2: merge into at most 5 stripes
+
+    // Step 1: map the byte range to StripingCells
+    StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, blockGroup,
+        rangeStartInBlockGroup, rangeEndInBlockGroup);
+
+    // Step 2: get the unmerged ranges on each internal block
+    VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cells);
+
+    // Step 3: merge into at most 5 stripes
     AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
 
-    // Step 3: calculate each chunk's position in destination buffer
-    calcualteChunkPositionsInBuf(ecSchema, blockGroup, buf, offsetInBuf,
-        firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk,
-        lastCellSize, stripes);
+    // Step 4: calculate each chunk's position in destination buffer
+    calcualteChunkPositionsInBuf(ecSchema, stripes, cells, buf, offsetInBuf);
 
-    // Step 4: prepare ALLZERO blocks
+    // Step 5: prepare ALLZERO blocks
     prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum);
 
     return stripes;
   }
 
-  private static VerticalRange[] getRangesForInternalBlocks (ECSchema ecSchema,
-      int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize,
-      long firstCellOffsetInBlk, int lastCellSize) {
+  /**
+   * Map the logical byte range to a set of inclusive {@link StripingCell}
+   * instances, each representing the overlap of the byte range to a cell
+   * used by {@link DFSStripedOutputStream} in encoding
+   */
+  @VisibleForTesting
+  private static StripingCell[] getStripingCellsOfByteRange(ECSchema ecSchema,
+      LocatedStripedBlock blockGroup,
+      long rangeStartInBlockGroup, long rangeEndInBlockGroup) {
+    Preconditions.checkArgument(
+        rangeStartInBlockGroup <= rangeEndInBlockGroup &&
+            rangeEndInBlockGroup < blockGroup.getBlockSize());
     int cellSize = ecSchema.getChunkSize();
-    int dataBlkNum = ecSchema.getNumDataUnits();
+    int len = (int) (rangeEndInBlockGroup - rangeStartInBlockGroup + 1);
+    int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
+    int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize);
+    int numCells = lastCellIdxInBG - firstCellIdxInBG + 1;
+    StripingCell[] cells = new StripingCell[numCells];
+    cells[0] = new StripingCell(ecSchema, firstCellIdxInBG);
+    cells[numCells - 1] = new StripingCell(ecSchema, lastCellIdxInBG);
+
+    cells[0].offset = (int) (rangeStartInBlockGroup % cellSize);
+    cells[0].size =
+        Math.min(cellSize - (int) (rangeStartInBlockGroup % cellSize), len);
+    if (lastCellIdxInBG != firstCellIdxInBG) {
+      cells[numCells - 1].size = (int) (rangeEndInBlockGroup % cellSize) + 1;
+    }
+
+    for (int i = 1; i < numCells - 1; i++) {
+      cells[i] = new StripingCell(ecSchema, i + firstCellIdxInBG);
+    }
+
+    return cells;
+  }
 
+  /**
+   * Given a logical start offset in a block group, calculate the physical
+   * start offset into each stored internal block.
+   */
+  public static long[] getStartOffsetsForInternalBlocks(
+      ECSchema ecSchema, LocatedStripedBlock blockGroup,
+      long rangeStartInBlockGroup) {
+    Preconditions.checkArgument(
+        rangeStartInBlockGroup < blockGroup.getBlockSize());
+    int dataBlkNum = ecSchema.getNumDataUnits();
+    int parityBlkNum = ecSchema.getNumParityUnits();
+    int cellSize = ecSchema.getChunkSize();
+    long[] startOffsets = new long[dataBlkNum + parityBlkNum];
+    Arrays.fill(startOffsets, -1L);
+    int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
     StripingCell firstCell = new StripingCell(ecSchema, firstCellIdxInBG);
-    StripingCell lastCell = new StripingCell(ecSchema, lastCellIdxInBG);
+    firstCell.offset = (int) (rangeStartInBlockGroup % cellSize);
+    startOffsets[firstCell.idxInStripe] =
+        firstCell.idxInInternalBlk * cellSize + firstCell.offset;
+    long earliestStart = startOffsets[firstCell.idxInStripe];
+    for (int i = 1; i < dataBlkNum; i++) {
+      int idx = firstCellIdxInBG + i;
+      if (idx * cellSize >= blockGroup.getBlockSize()) {
+        break;
+      }
+      StripingCell cell = new StripingCell(ecSchema, idx);
+      startOffsets[cell.idxInStripe] = cell.idxInInternalBlk * cellSize;
+      if (startOffsets[cell.idxInStripe] < earliestStart) {
+        earliestStart = startOffsets[cell.idxInStripe];
+      }
+    }
+    for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
+      startOffsets[i] = earliestStart;
+    }
+    return startOffsets;
+  }
 
-    VerticalRange ranges[] = new VerticalRange[dataBlkNum];
-    ranges[firstCell.idxInStripe] =
-        new VerticalRange(firstCellOffsetInBlk, firstCellSize);
-    for (int i = firstCellIdxInBG + 1; i < lastCellIdxInBG; i++) {
+  /**
+   * Given a logical byte range, mapped to each {@link StripingCell}, calculate
+   * the physical byte range (inclusive) on each stored internal block.
+   */
+  @VisibleForTesting
+  private static VerticalRange[] getRangesForInternalBlocks(ECSchema ecSchema,
+      StripingCell[] cells) {
+    int cellSize = ecSchema.getChunkSize();
+    int dataBlkNum = ecSchema.getNumDataUnits();
+    int parityBlkNum = ecSchema.getNumParityUnits();
+
+    VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum];
+
+    long earliestStart = Long.MAX_VALUE;
+    long latestEnd = -1;
+    for (StripingCell cell : cells) {
       // iterate through all cells and update the list of StripeRanges
-      StripingCell cell = new StripingCell(ecSchema, i);
       if (ranges[cell.idxInStripe] == null) {
         ranges[cell.idxInStripe] = new VerticalRange(
-            cell.idxInInternalBlk * cellSize, cellSize);
+            cell.idxInInternalBlk * cellSize + cell.offset, cell.size);
       } else {
-        ranges[cell.idxInStripe].spanInBlock += cellSize;
+        ranges[cell.idxInStripe].spanInBlock += cell.size;
+      }
+      VerticalRange range = ranges[cell.idxInStripe];
+      if (range.offsetInBlock < earliestStart) {
+        earliestStart = range.offsetInBlock;
+      }
+      if (range.offsetInBlock + range.spanInBlock - 1 > latestEnd) {
+        latestEnd = range.offsetInBlock + range.spanInBlock - 1;
       }
     }
-    if (ranges[lastCell.idxInStripe] == null) {
-      ranges[lastCell.idxInStripe] = new VerticalRange(
-          lastCell.idxInInternalBlk * cellSize, lastCellSize);
-    } else if (lastCell.idxInBlkGroup != firstCell.idxInBlkGroup) {
-      ranges[lastCell.idxInStripe].spanInBlock += lastCellSize;
+
+    // Each parity block should be fetched at maximum range of all data blocks
+    for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
+      ranges[i] = new VerticalRange(earliestStart,
+          latestEnd - earliestStart + 1);
     }
 
     return ranges;
   }
 
-  private static AlignedStripe[] mergeRangesForInternalBlocks(ECSchema ecSchema,
-      VerticalRange[] ranges) {
+  /**
+   * Merge byte ranges on each internal block into a set of inclusive
+   * {@link AlignedStripe} instances.
+   */
+  private static AlignedStripe[] mergeRangesForInternalBlocks(
+      ECSchema ecSchema, VerticalRange[] ranges) {
     int dataBlkNum = ecSchema.getNumDataUnits();
     int parityBlkNum = ecSchema.getNumParityUnits();
     List<AlignedStripe> stripes = new ArrayList<>();
@@ -461,12 +486,8 @@ public class StripedBlockUtil {
   }
 
   private static void calcualteChunkPositionsInBuf(ECSchema ecSchema,
-      LocatedStripedBlock blockGroup, byte[] buf, int offsetInBuf,
-      int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize,
-      long firstCellOffsetInBlk, int lastCellSize, AlignedStripe[] stripes) {
-    int cellSize = ecSchema.getChunkSize();
-    int dataBlkNum = ecSchema.getNumDataUnits();
-    // Step 3: calculate each chunk's position in destination buffer
+      AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
+      int offsetInBuf) {
     /**
      *     | <--------------- AlignedStripe --------------->|
      *
@@ -484,20 +505,11 @@ public class StripedBlockUtil {
      *
      * Cell indexing convention defined in {@link StripingCell}
      */
+    int cellSize = ecSchema.getChunkSize();
     int done = 0;
-    for (int i = firstCellIdxInBG; i <= lastCellIdxInBG; i++) {
-      StripingCell cell  = new StripingCell(ecSchema, i);
-      long cellStart = i == firstCellIdxInBG ?
-          firstCellOffsetInBlk : cell.idxInInternalBlk * cellSize;
-      int cellLen;
-      if (i == firstCellIdxInBG) {
-        cellLen = firstCellSize;
-      } else if (i == lastCellIdxInBG) {
-        cellLen = lastCellSize;
-      } else {
-        cellLen = cellSize;
-      }
-      long cellEnd = cellStart + cellLen - 1;
+    for (StripingCell cell : cells) {
+      long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
+      long cellEnd = cellStart + cell.size - 1;
       for (AlignedStripe s : stripes) {
         long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
         long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
@@ -514,10 +526,14 @@ public class StripedBlockUtil {
             add((int)(offsetInBuf + done + overlapStart - cellStart));
         s.chunks[cell.idxInStripe].lengthsInBuf.add(overLapLen);
       }
-      done += cellLen;
+      done += cell.size;
     }
   }
 
+  /**
+   * If a {@link StripingChunk} maps to a byte range beyond an internal block's
+   * size, the chunk should be treated as zero bytes in decoding.
+   */
   private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup,
       byte[] buf, AlignedStripe[] stripes, int cellSize, int dataBlkNum) {
     for (AlignedStripe s : stripes) {
@@ -534,51 +550,13 @@ public class StripedBlockUtil {
   }
 
   /**
-   * This class represents the portion of I/O associated with each block in the
-   * striped block group.
-   * TODO: consolidate ReadPortion with AlignedStripe
-   */
-  public static class ReadPortion {
-    private long startOffsetInBlock = 0;
-    private int readLength = 0;
-    public final List<Integer> offsetsInBuf = new ArrayList<>();
-    public final List<Integer> lengths = new ArrayList<>();
-
-    public int[] getOffsets() {
-      int[] offsets = new int[offsetsInBuf.size()];
-      for (int i = 0; i < offsets.length; i++) {
-        offsets[i] = offsetsInBuf.get(i);
-      }
-      return offsets;
-    }
-
-    public int[] getLengths() {
-      int[] lens = new int[this.lengths.size()];
-      for (int i = 0; i < lens.length; i++) {
-        lens[i] = this.lengths.get(i);
-      }
-      return lens;
-    }
-
-    public long getStartOffsetInBlock() {
-      return startOffsetInBlock;
-    }
-
-    public int getReadLength() {
-      return readLength;
-    }
-
-    public void setStartOffsetInBlock(long startOffsetInBlock) {
-      this.startOffsetInBlock = startOffsetInBlock;
-    }
-
-    void addReadLength(int extraLength) {
-      this.readLength += extraLength;
-    }
-  }
-
-  /**
-   * The unit of encoding used in {@link DFSStripedOutputStream}
+   * Cell is the unit of encoding used in {@link DFSStripedOutputStream}. This
+   * size impacts how a logical offset in the file or block group translates
+   * to physical byte offset in a stored internal block. The StripingCell util
+   * class facilitates this calculation. Each StripingCell is inclusive with
+   * its start and end offsets -- e.g., the end logical offset of cell_0_0_0
+   * should be 1 byte lower than the start logical offset of cell_1_0_1.
+   *
    *  | <------- Striped Block Group -------> |
    *    blk_0          blk_1          blk_2
    *      |              |              |
@@ -586,43 +564,57 @@ public class StripedBlockUtil {
    * +----------+   +----------+   +----------+
    * |cell_0_0_0|   |cell_1_0_1|   |cell_2_0_2|
    * +----------+   +----------+   +----------+
-   * |cell_3_1_0|   |cell_4_1_1|   |cell_5_1_2| <- {@link idxInBlkGroup} = 5
-   * +----------+   +----------+   +----------+    {@link idxInInternalBlk} = 1
-   *                                               {@link idxInStripe} = 2
+   * |cell_3_1_0|   |cell_4_1_1|   |cell_5_1_2| <- {@link #idxInBlkGroup} = 5
+   * +----------+   +----------+   +----------+    {@link #idxInInternalBlk} = 1
+   *                                               {@link #idxInStripe} = 2
    * A StripingCell is a special instance of {@link StripingChunk} whose offset
    * and size align with the cell used when writing data.
    * TODO: consider parity cells
    */
-  public static class StripingCell {
+  @VisibleForTesting
+  static class StripingCell {
     public final ECSchema schema;
     /** Logical order in a block group, used when doing I/O to a block group */
-    public final int idxInBlkGroup;
-    public final int idxInInternalBlk;
-    public final int idxInStripe;
+    final int idxInBlkGroup;
+    final int idxInInternalBlk;
+    final int idxInStripe;
+    /**
+     * When a logical byte range is mapped to a set of cells, it might
+     * partially overlap with the first and last cells. This field and the
+     * {@link #size} variable represent the start offset and size of the
+     * overlap.
+     */
+    int offset;
+    int size;
 
-    public StripingCell(ECSchema ecSchema, int idxInBlkGroup) {
+    StripingCell(ECSchema ecSchema, int idxInBlkGroup) {
       this.schema = ecSchema;
       this.idxInBlkGroup = idxInBlkGroup;
       this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits();
       this.idxInStripe = idxInBlkGroup -
           this.idxInInternalBlk * ecSchema.getNumDataUnits();
+      this.offset = 0;
+      this.size = ecSchema.getChunkSize();
     }
 
-    public StripingCell(ECSchema ecSchema, int idxInInternalBlk,
+    StripingCell(ECSchema ecSchema, int idxInInternalBlk,
         int idxInStripe) {
       this.schema = ecSchema;
       this.idxInInternalBlk = idxInInternalBlk;
       this.idxInStripe = idxInStripe;
       this.idxInBlkGroup =
           idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe;
+      this.offset = 0;
+      this.size = ecSchema.getChunkSize();
     }
   }
 
   /**
    * Given a requested byte range on a striped block group, an AlignedStripe
-   * represents a {@link VerticalRange} that is aligned with both the byte range
-   * and boundaries of all internal blocks. As illustrated in the diagram, any
-   * given byte range on a block group leads to 1~5 AlignedStripe's.
+   * represents an inclusive {@link VerticalRange} that is aligned with both
+   * the byte range and boundaries of all internal blocks. As illustrated in
+   * the diagram, any given byte range on a block group leads to 1~5
+   * AlignedStripe's.
    *
    * |<-------- Striped Block Group -------->|
    * blk_0   blk_1   blk_2      blk_3   blk_4
@@ -648,6 +640,7 @@ public class StripedBlockUtil {
    *
    * The coverage of an AlignedStripe on an internal block is represented as a
    * {@link StripingChunk}.
+   *
    * To simplify the logic of reading a logical byte range from a block group,
    * a StripingChunk is either completely in the requested byte range or
    * completely outside the requested byte range.
@@ -692,19 +685,19 @@ public class StripedBlockUtil {
 
   /**
    * A simple utility class representing an arbitrary vertical inclusive range
-   * starting at {@link offsetInBlock} and lasting for {@link length} bytes in
-   * an internal block. Note that VerticalRange doesn't necessarily align with
-   * {@link StripingCell}.
+   * starting at {@link #offsetInBlock} and lasting for {@link #spanInBlock}
+   * bytes in an internal block. Note that VerticalRange doesn't necessarily
+   * align with {@link StripingCell}.
    *
    * |<- Striped Block Group ->|
    *  blk_0
    *    |
    *    v
    * +-----+
-   * |~~~~~| <-- {@link offsetInBlock}
+   * |~~~~~| <-- {@link #offsetInBlock}
    * |     |  ^
    * |     |  |
-   * |     |  | {@link spanInBlock}
+   * |     |  | {@link #spanInBlock}
    * |     |  v
    * |~~~~~| ---
    * |     |
@@ -743,9 +736,9 @@ public class StripedBlockUtil {
    * +---------+  +---------+               |  +----+  +----+
    * <----------- data blocks ------------> | <--- parity --->
    *
-   * The class also carries {@link buf}, {@link offsetsInBuf}, and
-   * {@link lengthsInBuf} to define how read task for this chunk should deliver
-   * the returned data.
+   * The class also carries {@link #buf}, {@link #offsetsInBuf}, and
+   * {@link #lengthsInBuf} to define how read task for this chunk should
+   * deliver the returned data.
    */
   public static class StripingChunk {
     /** Chunk has been successfully fetched */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/12d030be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
deleted file mode 100644
index 75d0587..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.junit.Test;
-
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
-import static org.junit.Assert.*;
-
-public class TestPlanReadPortions {
-
-  // We only support this as num of data blocks. It might be good enough for now
-  // for the purpose, even not flexible yet for any number in a schema.
-  private final short GROUP_SIZE = 3;
-  private final int CELLSIZE = 128 * 1024;
-
-  private void testPlanReadPortions(int startInBlk, int length,
-      int bufferOffset, int[] readLengths, int[] offsetsInBlock,
-      int[][] bufferOffsets, int[][] bufferLengths) {
-    ReadPortion[] results = StripedBlockUtil.planReadPortions(GROUP_SIZE,
-        CELLSIZE, startInBlk, length, bufferOffset);
-    assertEquals(GROUP_SIZE, results.length);
-
-    for (int i = 0; i < GROUP_SIZE; i++) {
-      assertEquals(readLengths[i], results[i].getReadLength());
-      assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
-      final int[] bOffsets = results[i].getOffsets();
-      assertArrayEquals(bufferOffsets[i], bOffsets);
-      final int[] bLengths = results[i].getLengths();
-      assertArrayEquals(bufferLengths[i], bLengths);
-    }
-  }
-
-  /**
-   * Test {@link StripedBlockUtil#planReadPortions}
-   */
-  @Test
-  public void testPlanReadPortions() {
-    /**
-     * start block offset is 0, read cellSize - 10
-     */
-    testPlanReadPortions(0, CELLSIZE - 10, 0,
-        new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
-        new int[][]{new int[]{0}, new int[]{}, new int[]{}},
-        new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
-
-    /**
-     * start block offset is 0, read 3 * cellSize
-     */
-    testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
-        new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
-        new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
-        new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
-
-    /**
-     * start block offset is 0, read cellSize + 10
-     */
-    testPlanReadPortions(0, CELLSIZE + 10, 0,
-        new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
-        new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
-        new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
-
-    /**
-     * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
-     */
-    testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
-        new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
-        new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
-            new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
-            new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
-        new int[][]{new int[]{CELLSIZE, CELLSIZE},
-            new int[]{CELLSIZE, CELLSIZE},
-            new int[]{CELLSIZE, 10}});
-
-    /**
-     * start block offset is 2, read 3 * cellSize
-     */
-    testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
-        new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
-        new int[]{2, 0, 0},
-        new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
-            new int[]{100 + CELLSIZE - 2},
-            new int[]{100 + CELLSIZE * 2 - 2}},
-        new int[][]{new int[]{CELLSIZE - 2, 2},
-            new int[]{CELLSIZE},
-            new int[]{CELLSIZE}});
-
-    /**
-     * start block offset is 2, read 3 * cellSize + 10
-     */
-    testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
-        new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
-        new int[]{2, 0, 0},
-        new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
-            new int[]{CELLSIZE - 2},
-            new int[]{CELLSIZE * 2 - 2}},
-        new int[][]{new int[]{CELLSIZE - 2, 12},
-            new int[]{CELLSIZE},
-            new int[]{CELLSIZE}});
-
-    /**
-     * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
-     */
-    testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
-        new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
-        new int[]{CELLSIZE, CELLSIZE - 1, 0},
-        new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
-            new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
-            new int[]{1, 3 * CELLSIZE + 1}},
-        new int[][]{new int[]{CELLSIZE, CELLSIZE},
-            new int[]{1, CELLSIZE, 9},
-            new int[]{CELLSIZE, CELLSIZE}});
-
-    /**
-     * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
-     */
-    testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
-        new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
-        new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
-        new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
-            new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
-            new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
-        new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
-            new int[]{CELLSIZE, CELLSIZE, 9},
-            new int[]{1, CELLSIZE, CELLSIZE}});
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/12d030be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
index b4f05d4..dfdcee2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
@@ -189,13 +189,13 @@ public class TestRecoverStripedFile {
       deadDnIndices[i] = dnMap.get(dataDNs[i]);
       
       // Check the block replica file on deadDn before it dead.
-      blocks[i] = StripedBlockUtil.constructStripedBlock(
+      blocks[i] = StripedBlockUtil.constructInternalBlock(
           lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]);
       replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]);
       metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]);
       // the block replica on the datanode should be the same as expected
       assertEquals(replicas[i].length(), 
-          StripedBlockUtil.getStripedBlockLength(
+          StripedBlockUtil.getInternalBlockLength(
           lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]]));
       assertTrue(metadatas[i].getName().
           endsWith(blocks[i].getGenerationStamp() + ".meta"));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/12d030be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
index ec0b1bb..6f29d69 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.util;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -26,26 +27,107 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup;
-import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.*;
+
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Random;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
+/**
+ * Need to cover the following combinations:
+ * 1. Block group size:
+ *  1.1 One byte
+ *  1.2 Smaller than cell
+ *  1.3 One full cell
+ *  1.4 x full cells, where x is smaller than number of data blocks
+ *  1.5 x full cells plus a partial cell
+ *  1.6 One full stripe
+ *  1.7 One full stripe plus a partial cell
+ *  1.8 One full stripe plus x full cells
+ *  1.9 One full stripe plus x full cells plus a partial cell
+ *  1.10 y full stripes, but smaller than full block group size
+ *  1.11 Full block group size
+ *
+ * 2. Byte range start
+ *  2.1 Zero
+ *  2.2 Within first cell
+ *  2.3 End of first cell
+ *  2.4 Start of a middle* cell in the first stripe (* neither first or last)
+ *  2.5 End of middle cell in the first stripe
+ *  2.6 Within a middle cell in the first stripe
+ *  2.7 Start of the last cell in the first stripe
+ *  2.8 Within the last cell in the first stripe
+ *  2.9 End of the last cell in the first stripe
+ *  2.10 Start of a middle stripe
+ *  2.11 Within a middle stripe
+ *  2.12 End of a middle stripe
+ *  2.13 Start of the last stripe
+ *  2.14 Within the last stripe
+ *  2.15 End of the last stripe (last byte)
+ *
+ * 3. Byte range length: same settings as block group size
+ *
+ * We should test in total 11 x 15 x 11 = 1815 combinations
+ * TODO: test parity block logic
+ */
 public class TestStripedBlockUtil {
   private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
   private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
-  private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
+  private final short BLK_GROUP_WIDTH = DATA_BLK_NUM + PARITY_BLK_NUM;
   private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final int FULL_STRIPE_SIZE = DATA_BLK_NUM * CELLSIZE;
+  /** number of full stripes in a full block group */
+  private final int BLK_GROUP_STRIPE_NUM = 16;
+  private final ECSchema SCEHMA = ErasureCodingSchemaManager.
+      getSystemDefaultSchema();
+  private final Random random = new Random();
+
+  private int[] blockGroupSizes;
+  private int[] byteRangeStartOffsets;
+  private int[] byteRangeSizes;
+
+  @Before
+  public void setup(){
+    blockGroupSizes = new int[]{1, getDelta(CELLSIZE), CELLSIZE,
+        getDelta(DATA_BLK_NUM) * CELLSIZE,
+        getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE),
+        FULL_STRIPE_SIZE, FULL_STRIPE_SIZE + getDelta(CELLSIZE),
+        FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE,
+        FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE),
+        getDelta(BLK_GROUP_STRIPE_NUM) * FULL_STRIPE_SIZE,
+        BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE};
+    byteRangeStartOffsets = new int[] {0, getDelta(CELLSIZE), CELLSIZE - 1};
+    byteRangeSizes = new int[]{1, getDelta(CELLSIZE), CELLSIZE,
+        getDelta(DATA_BLK_NUM) * CELLSIZE,
+        getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE),
+        FULL_STRIPE_SIZE, FULL_STRIPE_SIZE + getDelta(CELLSIZE),
+        FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE,
+        FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE),
+        getDelta(BLK_GROUP_STRIPE_NUM) * FULL_STRIPE_SIZE,
+        BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE};
+  }
 
-  private LocatedStripedBlock createDummyLocatedBlock() {
+  private int getDelta(int size) {
+    return 1 + random.nextInt(size - 2);
+  }
+  private byte hashIntToByte(int i) {
+    int BYTE_MASK = 0xff;
+    return (byte) (((i + 13) * 29) & BYTE_MASK);
+  }
+
+  private LocatedStripedBlock createDummyLocatedBlock(int bgSize) {
     final long blockGroupID = -1048576;
-    DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_SIZE];
-    String[] storageIDs = new String[BLK_GROUP_SIZE];
-    StorageType[] storageTypes = new StorageType[BLK_GROUP_SIZE];
-    int[] indices = new int[BLK_GROUP_SIZE];
-    for (int i = 0; i < BLK_GROUP_SIZE; i++) {
+    DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_WIDTH];
+    String[] storageIDs = new String[BLK_GROUP_WIDTH];
+    StorageType[] storageTypes = new StorageType[BLK_GROUP_WIDTH];
+    int[] indices = new int[BLK_GROUP_WIDTH];
+    for (int i = 0; i < BLK_GROUP_WIDTH; i++) {
       indices[i] = (i + 2) % DATA_BLK_NUM;
       // Location port always equal to logical index of a block,
       // for easier verification
@@ -53,13 +135,40 @@ public class TestStripedBlockUtil {
       storageIDs[i] = locs[i].getDatanodeUuid();
       storageTypes[i] = StorageType.DISK;
     }
-    return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
-        locs, storageIDs, storageTypes, indices, 0, false, null);
+    return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID,
+        bgSize, 1001), locs, storageIDs, storageTypes, indices, 0, false,
+        null);
+  }
+
+  private byte[][] createInternalBlkBuffers(int bgSize) {
+    byte[][] bufs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][];
+    int[] pos = new int[DATA_BLK_NUM + PARITY_BLK_NUM];
+    for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) {
+      int bufSize = (int) getInternalBlockLength(
+          bgSize, CELLSIZE, DATA_BLK_NUM, i);
+      bufs[i] = new byte[bufSize];
+      pos[i] = 0;
+    }
+    int done = 0;
+    while (done < bgSize) {
+      Preconditions.checkState(done % CELLSIZE == 0);
+      StripingCell cell = new StripingCell(SCEHMA, done / CELLSIZE);
+      int idxInStripe = cell.idxInStripe;
+      int size = Math.min(CELLSIZE, bgSize - done);
+      for (int i = 0; i < size; i++) {
+        bufs[idxInStripe][pos[idxInStripe] + i] = hashIntToByte(done + i);
+      }
+      done += size;
+      pos[idxInStripe] += size;
+    }
+
+    return bufs;
   }
 
   @Test
   public void testParseDummyStripedBlock() {
-    LocatedStripedBlock lsb = createDummyLocatedBlock();
+    LocatedStripedBlock lsb = createDummyLocatedBlock(
+        BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE);
     LocatedBlock[] blocks = parseStripedBlockGroup(
         lsb, CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
     assertEquals(DATA_BLK_NUM + PARITY_BLK_NUM, blocks.length);
@@ -68,14 +177,15 @@ public class TestStripedBlockUtil {
       assertEquals(i,
           BlockIdManager.getBlockIndex(blocks[i].getBlock().getLocalBlock()));
       assertEquals(i * CELLSIZE, blocks[i].getStartOffset());
+      /** TODO: properly define {@link LocatedBlock#offset} for internal blocks */
       assertEquals(1, blocks[i].getLocations().length);
       assertEquals(i, blocks[i].getLocations()[0].getIpcPort());
       assertEquals(i, blocks[i].getLocations()[0].getXferPort());
     }
   }
 
-  private void verifyInternalBlocks (long numBytesInGroup, long[] expected) {
-    for (int i = 1; i < BLK_GROUP_SIZE; i++) {
+  private void verifyInternalBlocks (int numBytesInGroup, int[] expected) {
+    for (int i = 1; i < BLK_GROUP_WIDTH; i++) {
       assertEquals(expected[i],
           getInternalBlockLength(numBytesInGroup, CELLSIZE, DATA_BLK_NUM, i));
     }
@@ -85,41 +195,85 @@ public class TestStripedBlockUtil {
   public void testGetInternalBlockLength () {
     // A small delta that is smaller than a cell
     final int delta = 10;
-    assert delta < CELLSIZE;
 
     // Block group is smaller than a cell
     verifyInternalBlocks(CELLSIZE - delta,
-        new long[] {CELLSIZE - delta, 0, 0, 0, 0, 0,
+        new int[] {CELLSIZE - delta, 0, 0, 0, 0, 0,
             CELLSIZE - delta, CELLSIZE - delta, CELLSIZE - delta});
 
     // Block group is exactly as large as a cell
     verifyInternalBlocks(CELLSIZE,
-        new long[] {CELLSIZE, 0, 0, 0, 0, 0,
+        new int[] {CELLSIZE, 0, 0, 0, 0, 0,
             CELLSIZE, CELLSIZE, CELLSIZE});
 
     // Block group is a little larger than a cell
     verifyInternalBlocks(CELLSIZE + delta,
-        new long[] {CELLSIZE, delta, 0, 0, 0, 0,
+        new int[] {CELLSIZE, delta, 0, 0, 0, 0,
             CELLSIZE, CELLSIZE, CELLSIZE});
 
     // Block group contains multiple stripes and ends at stripe boundary
     verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE,
-        new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
+        new int[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
             2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
             2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE});
 
     // Block group contains multiple stripes and ends at cell boundary
     // (not ending at stripe boundary)
     verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE + CELLSIZE,
-        new long[] {3 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
+        new int[] {3 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
             2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
             3 * CELLSIZE, 3 * CELLSIZE, 3 * CELLSIZE});
 
     // Block group contains multiple stripes and doesn't end at cell boundary
     verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE - delta,
-        new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
+        new int[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
             2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE - delta,
             2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE});
   }
 
+  /**
+   * Test dividing a byte range into aligned stripes and verify the aligned
+   * ranges can be translated back to the byte range.
+   */
+  @Test
+  public void testDivideByteRangeIntoStripes() {
+    byte[] assembled = new byte[BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE];
+    for (int bgSize : blockGroupSizes) {
+      LocatedStripedBlock blockGroup = createDummyLocatedBlock(bgSize);
+      byte[][] internalBlkBufs = createInternalBlkBuffers(bgSize);
+      for (int brStart : byteRangeStartOffsets) {
+        for (int brSize : byteRangeSizes) {
+          if (brStart + brSize > bgSize) {
+            continue;
+          }
+          AlignedStripe[] stripes = divideByteRangeIntoStripes(SCEHMA,
+              blockGroup, brStart, brStart + brSize - 1, assembled, 0);
+
+          for (AlignedStripe stripe : stripes) {
+            for (int i = 0; i < DATA_BLK_NUM; i++) {
+              StripingChunk chunk = stripe.chunks[i];
+              if (chunk == null || chunk.state != StripingChunk.REQUESTED) {
+                continue;
+              }
+              int done = 0;
+              for (int j = 0; j < chunk.getLengths().length; j++) {
+                System.arraycopy(internalBlkBufs[i],
+                    (int) stripe.getOffsetInBlock() + done, assembled,
+                    chunk.getOffsets()[j], chunk.getLengths()[j]);
+                done += chunk.getLengths()[j];
+              }
+            }
+          }
+          for (int i = 0; i < brSize; i++) {
+            if (hashIntToByte(brStart + i) != assembled[i]) {
+              System.out.println("Oops");
+            }
+            assertEquals("Byte at " + (brStart + i) + " should be the same",
+                hashIntToByte(brStart + i), assembled[i]);
+          }
+        }
+      }
+    }
+  }
+
 }