You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/11 21:23:30 UTC

[30/50] hadoop git commit: HDFS-8282. Erasure coding: move striped reading logic to StripedBlockUtil. Contributed by Zhe Zhang.

HDFS-8282. Erasure coding: move striped reading logic to StripedBlockUtil. Contributed by Zhe Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d2e70cb9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d2e70cb9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d2e70cb9

Branch: refs/heads/HDFS-7285
Commit: d2e70cb996e09488c7497bd3fd0548c7211ab592
Parents: a71bfe0
Author: Zhe Zhang <zh...@apache.org>
Authored: Wed Apr 29 23:49:52 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 11 11:36:22 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../hadoop/hdfs/DFSStripedInputStream.java      | 111 +-----------
 .../hadoop/hdfs/util/StripedBlockUtil.java      | 174 +++++++++++++++++++
 .../hadoop/hdfs/TestPlanReadPortions.java       |  11 +-
 4 files changed, 186 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2e70cb9/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 6a9bdee..ca60487 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -146,3 +146,6 @@
 
     HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream 
     (stateful read). (Jing Zhao via Zhe Zhang)
+
+    HDFS-8282. Erasure coding: move striped reading logic to StripedBlockUtil.
+    (Zhe Zhang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2e70cb9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 3da7306..0dc98fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.hdfs;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
+
 import org.apache.hadoop.net.NetUtils;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -31,8 +33,6 @@ import org.apache.htrace.TraceScope;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Set;
 import java.util.Map;
 import java.util.HashMap;
@@ -69,59 +69,6 @@ import java.util.concurrent.Future;
  *   3. pread with decode support: TODO: will be supported after HDFS-7678
  *****************************************************************************/
 public class DFSStripedInputStream extends DFSInputStream {
-  /**
-   * This method plans the read portion from each block in the stripe
-   * @param dataBlkNum The number of data blocks in the striping group
-   * @param cellSize The size of each striping cell
-   * @param startInBlk Starting offset in the striped block
-   * @param len Length of the read request
-   * @param bufOffset  Initial offset in the result buffer
-   * @return array of {@link ReadPortion}, each representing the portion of I/O
-   *         for an individual block in the group
-   */
-  @VisibleForTesting
-  static ReadPortion[] planReadPortions(final int dataBlkNum,
-      final int cellSize, final long startInBlk, final int len, int bufOffset) {
-    ReadPortion[] results = new ReadPortion[dataBlkNum];
-    for (int i = 0; i < dataBlkNum; i++) {
-      results[i] = new ReadPortion();
-    }
-
-    // cellIdxInBlk is the index of the cell in the block
-    // E.g., cell_3 is the 2nd cell in blk_0
-    int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));
-
-    // blkIdxInGroup is the index of the block in the striped block group
-    // E.g., blk_2 is the 3rd block in the group
-    final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
-    results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
-        startInBlk % cellSize;
-    boolean crossStripe = false;
-    for (int i = 1; i < dataBlkNum; i++) {
-      if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
-        cellIdxInBlk++;
-        crossStripe = true;
-      }
-      results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
-          cellSize * cellIdxInBlk;
-    }
-
-    int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
-    results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
-    results[blkIdxInGroup].lengths.add(firstCellLen);
-    results[blkIdxInGroup].readLength += firstCellLen;
-
-    int i = (blkIdxInGroup + 1) % dataBlkNum;
-    for (int done = firstCellLen; done < len; done += cellSize) {
-      ReadPortion rp = results[i];
-      rp.offsetsInBuf.add(done + bufOffset);
-      final int readLen = Math.min(len - done, cellSize);
-      rp.lengths.add(readLen);
-      rp.readLength += readLen;
-      i = (i + 1) % dataBlkNum;
-    }
-    return results;
-  }
 
   private static class ReaderRetryPolicy {
     private int fetchEncryptionKeyTimes = 1;
@@ -520,56 +467,4 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
     throw new InterruptedException("let's retry");
   }
-
-
-  /**
-   * This class represents the portion of I/O associated with each block in the
-   * striped block group.
-   */
-  static class ReadPortion {
-    /**
-     * startOffsetInBlock
-     *     |
-     *     v
-     *     |<-lengths[0]->|<-  lengths[1]  ->|<-lengths[2]->|
-     * +------------------+------------------+----------------+
-     * |      cell_0      |      cell_3      |     cell_6     |  <- blk_0
-     * +------------------+------------------+----------------+
-     *   _/                \_______________________
-     *  |                                          |
-     *  v offsetsInBuf[0]                          v offsetsInBuf[1]
-     * +------------------------------------------------------+
-     * |  cell_0     |      cell_1 and cell_2      |cell_3 ...|   <- buf
-     * |  (partial)  |    (from blk_1 and blk_2)   |          |
-     * +------------------------------------------------------+
-     */
-    private long startOffsetInBlock = 0;
-    private int readLength = 0;
-    private final List<Integer> offsetsInBuf = new ArrayList<>();
-    private final List<Integer> lengths = new ArrayList<>();
-
-    int[] getOffsets() {
-      int[] offsets = new int[offsetsInBuf.size()];
-      for (int i = 0; i < offsets.length; i++) {
-        offsets[i] = offsetsInBuf.get(i);
-      }
-      return offsets;
-    }
-
-    int[] getLengths() {
-      int[] lens = new int[this.lengths.size()];
-      for (int i = 0; i < lens.length; i++) {
-        lens[i] = this.lengths.get(i);
-      }
-      return lens;
-    }
-
-    int getReadLength() {
-      return readLength;
-    }
-
-    long getStartOffsetInBlock() {
-      return startOffsetInBlock;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2e70cb9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index d622d4d..cb6d39a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.util;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -27,6 +28,15 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 
 import com.google.common.base.Preconditions;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Utility class for analyzing striped block groups
  */
@@ -134,4 +144,168 @@ public class StripedBlockUtil {
         + offsetInBlk % cellSize; // partial cell
   }
 
+  /**
+   * This method plans the read portion from each block in the stripe
+   * @param dataBlkNum The number of data blocks in the striping group
+   * @param cellSize The size of each striping cell
+   * @param startInBlk Starting offset in the striped block
+   * @param len Length of the read request
+   * @param bufOffset  Initial offset in the result buffer
+   * @return array of {@link ReadPortion}, each representing the portion of I/O
+   *         for an individual block in the group
+   */
+  @VisibleForTesting
+  public static ReadPortion[] planReadPortions(final int dataBlkNum,
+      final int cellSize, final long startInBlk, final int len, int bufOffset) {
+    ReadPortion[] results = new ReadPortion[dataBlkNum];
+    for (int i = 0; i < dataBlkNum; i++) {
+      results[i] = new ReadPortion();
+    }
+
+    // cellIdxInBlk is the index of the cell in the block
+    // E.g., cell_3 is the 2nd cell in blk_0
+    int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));
+
+    // blkIdxInGroup is the index of the block in the striped block group
+    // E.g., blk_2 is the 3rd block in the group
+    final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
+    results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
+        startInBlk % cellSize;
+    boolean crossStripe = false;
+    for (int i = 1; i < dataBlkNum; i++) {
+      if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
+        cellIdxInBlk++;
+        crossStripe = true;
+      }
+      results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
+          cellSize * cellIdxInBlk;
+    }
+
+    int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
+    results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
+    results[blkIdxInGroup].lengths.add(firstCellLen);
+    results[blkIdxInGroup].readLength += firstCellLen;
+
+    int i = (blkIdxInGroup + 1) % dataBlkNum;
+    for (int done = firstCellLen; done < len; done += cellSize) {
+      ReadPortion rp = results[i];
+      rp.offsetsInBuf.add(done + bufOffset);
+      final int readLen = Math.min(len - done, cellSize);
+      rp.lengths.add(readLen);
+      rp.readLength += readLen;
+      i = (i + 1) % dataBlkNum;
+    }
+    return results;
+  }
+
+  /**
+   * Get the next completed striped read task
+   *
+   * @return {@link StripedReadResult} indicating the status of the read task
+   *          succeeded, and the block index of the task. If the method times
+   *          out without getting any completed read tasks, -1 is returned as
+   *          block index.
+   * @throws InterruptedException
+   */
+  public static StripedReadResult getNextCompletedStripedRead(
+      CompletionService<Void> readService, Map<Future<Void>,
+      Integer> futures, final long threshold) throws InterruptedException {
+    Preconditions.checkArgument(!futures.isEmpty());
+    Preconditions.checkArgument(threshold > 0);
+    Future<Void> future = null;
+    try {
+      future = readService.poll(threshold, TimeUnit.MILLISECONDS);
+      if (future != null) {
+        future.get();
+        return new StripedReadResult(futures.remove(future),
+            StripedReadResult.SUCCESSFUL);
+      } else {
+        return new StripedReadResult(StripedReadResult.TIMEOUT);
+      }
+    } catch (ExecutionException e) {
+      return new StripedReadResult(futures.remove(future),
+          StripedReadResult.FAILED);
+    } catch (CancellationException e) {
+      return new StripedReadResult(futures.remove(future),
+          StripedReadResult.CANCELLED);
+    }
+  }
+
+  /**
+   * This class represents the portion of I/O associated with each block in the
+   * striped block group.
+   */
+  public static class ReadPortion {
+    /**
+     * startOffsetInBlock
+     *     |
+     *     v
+     *     |<-lengths[0]->|<-  lengths[1]  ->|<-lengths[2]->|
+     * +------------------+------------------+----------------+
+     * |      cell_0      |      cell_3      |     cell_6     |  <- blk_0
+     * +------------------+------------------+----------------+
+     *   _/                \_______________________
+     *  |                                          |
+     *  v offsetsInBuf[0]                          v offsetsInBuf[1]
+     * +------------------------------------------------------+
+     * |  cell_0     |      cell_1 and cell_2      |cell_3 ...|   <- buf
+     * |  (partial)  |    (from blk_1 and blk_2)   |          |
+     * +------------------------------------------------------+
+     */
+    public long startOffsetInBlock = 0;
+    public int readLength = 0;
+    public final List<Integer> offsetsInBuf = new ArrayList<>();
+    public final List<Integer> lengths = new ArrayList<>();
+
+    public int[] getOffsets() {
+      int[] offsets = new int[offsetsInBuf.size()];
+      for (int i = 0; i < offsets.length; i++) {
+        offsets[i] = offsetsInBuf.get(i);
+      }
+      return offsets;
+    }
+
+    public int[] getLengths() {
+      int[] lens = new int[this.lengths.size()];
+      for (int i = 0; i < lens.length; i++) {
+        lens[i] = this.lengths.get(i);
+      }
+      return lens;
+    }
+
+    public boolean containsReadPortion(ReadPortion rp) {
+      long end = startOffsetInBlock + readLength;
+      return startOffsetInBlock <= rp.startOffsetInBlock && end >=
+          rp.startOffsetInBlock + rp.readLength;
+    }
+  }
+
+  /**
+   * This class represents result from a striped read request.
+   * If the task was successful or the internal computation failed,
+   * an index is also returned.
+   */
+  public static class StripedReadResult {
+    public static final int SUCCESSFUL = 0x01;
+    public static final int FAILED = 0x02;
+    public static final int TIMEOUT = 0x04;
+    public static final int CANCELLED = 0x08;
+
+    public final int index;
+    public final int state;
+
+    public StripedReadResult(int state) {
+      Preconditions.checkArgument(state == TIMEOUT,
+          "Only timeout result should return negative index.");
+      this.index = -1;
+      this.state = state;
+    }
+
+    public StripedReadResult(int index, int state) {
+      Preconditions.checkArgument(state != TIMEOUT,
+          "Timeout result should return negative index.");
+      this.index = index;
+      this.state = state;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2e70cb9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
index cf84b30..3b5787a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
@@ -19,7 +19,8 @@ package org.apache.hadoop.hdfs;
 
 import org.junit.Test;
 
-import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
 import static org.junit.Assert.*;
 
 public class TestPlanReadPortions {
@@ -32,13 +33,13 @@ public class TestPlanReadPortions {
   private void testPlanReadPortions(int startInBlk, int length,
       int bufferOffset, int[] readLengths, int[] offsetsInBlock,
       int[][] bufferOffsets, int[][] bufferLengths) {
-    ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
+    ReadPortion[] results = StripedBlockUtil.planReadPortions(GROUP_SIZE,
         CELLSIZE, startInBlk, length, bufferOffset);
     assertEquals(GROUP_SIZE, results.length);
 
     for (int i = 0; i < GROUP_SIZE; i++) {
-      assertEquals(readLengths[i], results[i].getReadLength());
-      assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
+      assertEquals(readLengths[i], results[i].readLength);
+      assertEquals(offsetsInBlock[i], results[i].startOffsetInBlock);
       final int[] bOffsets = results[i].getOffsets();
       assertArrayEquals(bufferOffsets[i], bOffsets);
       final int[] bLengths = results[i].getLengths();
@@ -47,7 +48,7 @@ public class TestPlanReadPortions {
   }
 
   /**
-   * Test {@link DFSStripedInputStream#planReadPortions}
+   * Test {@link StripedBlockUtil#planReadPortions}
    */
   @Test
   public void testPlanReadPortions() {