You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/11 21:23:30 UTC
[30/50] hadoop git commit: HDFS-8282. Erasure coding: move striped
reading logic to StripedBlockUtil. Contributed by Zhe Zhang.
HDFS-8282. Erasure coding: move striped reading logic to StripedBlockUtil. Contributed by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d2e70cb9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d2e70cb9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d2e70cb9
Branch: refs/heads/HDFS-7285
Commit: d2e70cb996e09488c7497bd3fd0548c7211ab592
Parents: a71bfe0
Author: Zhe Zhang <zh...@apache.org>
Authored: Wed Apr 29 23:49:52 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 11 11:36:22 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../hadoop/hdfs/DFSStripedInputStream.java | 111 +-----------
.../hadoop/hdfs/util/StripedBlockUtil.java | 174 +++++++++++++++++++
.../hadoop/hdfs/TestPlanReadPortions.java | 11 +-
4 files changed, 186 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2e70cb9/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 6a9bdee..ca60487 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -146,3 +146,6 @@
HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream
(stateful read). (Jing Zhao via Zhe Zhang)
+
+ HDFS-8282. Erasure coding: move striped reading logic to StripedBlockUtil.
+ (Zhe Zhang)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2e70cb9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 3da7306..0dc98fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hdfs;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
+
import org.apache.hadoop.net.NetUtils;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@@ -31,8 +33,6 @@ import org.apache.htrace.TraceScope;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
@@ -69,59 +69,6 @@ import java.util.concurrent.Future;
* 3. pread with decode support: TODO: will be supported after HDFS-7678
*****************************************************************************/
public class DFSStripedInputStream extends DFSInputStream {
- /**
- * This method plans the read portion from each block in the stripe
- * @param dataBlkNum The number of data blocks in the striping group
- * @param cellSize The size of each striping cell
- * @param startInBlk Starting offset in the striped block
- * @param len Length of the read request
- * @param bufOffset Initial offset in the result buffer
- * @return array of {@link ReadPortion}, each representing the portion of I/O
- * for an individual block in the group
- */
- @VisibleForTesting
- static ReadPortion[] planReadPortions(final int dataBlkNum,
- final int cellSize, final long startInBlk, final int len, int bufOffset) {
- ReadPortion[] results = new ReadPortion[dataBlkNum];
- for (int i = 0; i < dataBlkNum; i++) {
- results[i] = new ReadPortion();
- }
-
- // cellIdxInBlk is the index of the cell in the block
- // E.g., cell_3 is the 2nd cell in blk_0
- int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));
-
- // blkIdxInGroup is the index of the block in the striped block group
- // E.g., blk_2 is the 3rd block in the group
- final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
- results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
- startInBlk % cellSize;
- boolean crossStripe = false;
- for (int i = 1; i < dataBlkNum; i++) {
- if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
- cellIdxInBlk++;
- crossStripe = true;
- }
- results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
- cellSize * cellIdxInBlk;
- }
-
- int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
- results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
- results[blkIdxInGroup].lengths.add(firstCellLen);
- results[blkIdxInGroup].readLength += firstCellLen;
-
- int i = (blkIdxInGroup + 1) % dataBlkNum;
- for (int done = firstCellLen; done < len; done += cellSize) {
- ReadPortion rp = results[i];
- rp.offsetsInBuf.add(done + bufOffset);
- final int readLen = Math.min(len - done, cellSize);
- rp.lengths.add(readLen);
- rp.readLength += readLen;
- i = (i + 1) % dataBlkNum;
- }
- return results;
- }
private static class ReaderRetryPolicy {
private int fetchEncryptionKeyTimes = 1;
@@ -520,56 +467,4 @@ public class DFSStripedInputStream extends DFSInputStream {
}
throw new InterruptedException("let's retry");
}
-
-
- /**
- * This class represents the portion of I/O associated with each block in the
- * striped block group.
- */
- static class ReadPortion {
- /**
- * startOffsetInBlock
- * |
- * v
- * |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->|
- * +------------------+------------------+----------------+
- * | cell_0 | cell_3 | cell_6 | <- blk_0
- * +------------------+------------------+----------------+
- * _/ \_______________________
- * | |
- * v offsetsInBuf[0] v offsetsInBuf[1]
- * +------------------------------------------------------+
- * | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf
- * | (partial) | (from blk_1 and blk_2) | |
- * +------------------------------------------------------+
- */
- private long startOffsetInBlock = 0;
- private int readLength = 0;
- private final List<Integer> offsetsInBuf = new ArrayList<>();
- private final List<Integer> lengths = new ArrayList<>();
-
- int[] getOffsets() {
- int[] offsets = new int[offsetsInBuf.size()];
- for (int i = 0; i < offsets.length; i++) {
- offsets[i] = offsetsInBuf.get(i);
- }
- return offsets;
- }
-
- int[] getLengths() {
- int[] lens = new int[this.lengths.size()];
- for (int i = 0; i < lens.length; i++) {
- lens[i] = this.lengths.get(i);
- }
- return lens;
- }
-
- int getReadLength() {
- return readLength;
- }
-
- long getStartOffsetInBlock() {
- return startOffsetInBlock;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2e70cb9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index d622d4d..cb6d39a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.util;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -27,6 +28,15 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
/**
* Utility class for analyzing striped block groups
*/
@@ -134,4 +144,168 @@ public class StripedBlockUtil {
+ offsetInBlk % cellSize; // partial cell
}
+ /**
+ * This method plans the read portion from each block in the stripe
+ * @param dataBlkNum The number of data blocks in the striping group
+ * @param cellSize The size of each striping cell
+ * @param startInBlk Starting offset in the striped block
+ * @param len Length of the read request
+ * @param bufOffset Initial offset in the result buffer
+ * @return array of {@link ReadPortion}, each representing the portion of I/O
+ * for an individual block in the group
+ */
+ @VisibleForTesting
+ public static ReadPortion[] planReadPortions(final int dataBlkNum,
+ final int cellSize, final long startInBlk, final int len, int bufOffset) {
+ ReadPortion[] results = new ReadPortion[dataBlkNum];
+ for (int i = 0; i < dataBlkNum; i++) {
+ results[i] = new ReadPortion();
+ }
+
+ // cellIdxInBlk is the index of the cell in the block
+ // E.g., cell_3 is the 2nd cell in blk_0
+ int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));
+
+ // blkIdxInGroup is the index of the block in the striped block group
+ // E.g., blk_2 is the 3rd block in the group
+ final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
+ results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
+ startInBlk % cellSize;
+ boolean crossStripe = false;
+ for (int i = 1; i < dataBlkNum; i++) {
+ if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
+ cellIdxInBlk++;
+ crossStripe = true;
+ }
+ results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
+ cellSize * cellIdxInBlk;
+ }
+
+ int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
+ results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
+ results[blkIdxInGroup].lengths.add(firstCellLen);
+ results[blkIdxInGroup].readLength += firstCellLen;
+
+ int i = (blkIdxInGroup + 1) % dataBlkNum;
+ for (int done = firstCellLen; done < len; done += cellSize) {
+ ReadPortion rp = results[i];
+ rp.offsetsInBuf.add(done + bufOffset);
+ final int readLen = Math.min(len - done, cellSize);
+ rp.lengths.add(readLen);
+ rp.readLength += readLen;
+ i = (i + 1) % dataBlkNum;
+ }
+ return results;
+ }
+
+ /**
+ * Get the next completed striped read task
+ *
+ * @return {@link StripedReadResult} indicating the status of the read task
+ * succeeded, and the block index of the task. If the method times
+ * out without getting any completed read tasks, -1 is returned as
+ * block index.
+ * @throws InterruptedException
+ */
+ public static StripedReadResult getNextCompletedStripedRead(
+ CompletionService<Void> readService, Map<Future<Void>,
+ Integer> futures, final long threshold) throws InterruptedException {
+ Preconditions.checkArgument(!futures.isEmpty());
+ Preconditions.checkArgument(threshold > 0);
+ Future<Void> future = null;
+ try {
+ future = readService.poll(threshold, TimeUnit.MILLISECONDS);
+ if (future != null) {
+ future.get();
+ return new StripedReadResult(futures.remove(future),
+ StripedReadResult.SUCCESSFUL);
+ } else {
+ return new StripedReadResult(StripedReadResult.TIMEOUT);
+ }
+ } catch (ExecutionException e) {
+ return new StripedReadResult(futures.remove(future),
+ StripedReadResult.FAILED);
+ } catch (CancellationException e) {
+ return new StripedReadResult(futures.remove(future),
+ StripedReadResult.CANCELLED);
+ }
+ }
+
+ /**
+ * This class represents the portion of I/O associated with each block in the
+ * striped block group.
+ */
+ public static class ReadPortion {
+ /**
+ * startOffsetInBlock
+ * |
+ * v
+ * |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->|
+ * +------------------+------------------+----------------+
+ * | cell_0 | cell_3 | cell_6 | <- blk_0
+ * +------------------+------------------+----------------+
+ * _/ \_______________________
+ * | |
+ * v offsetsInBuf[0] v offsetsInBuf[1]
+ * +------------------------------------------------------+
+ * | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf
+ * | (partial) | (from blk_1 and blk_2) | |
+ * +------------------------------------------------------+
+ */
+ public long startOffsetInBlock = 0;
+ public int readLength = 0;
+ public final List<Integer> offsetsInBuf = new ArrayList<>();
+ public final List<Integer> lengths = new ArrayList<>();
+
+ public int[] getOffsets() {
+ int[] offsets = new int[offsetsInBuf.size()];
+ for (int i = 0; i < offsets.length; i++) {
+ offsets[i] = offsetsInBuf.get(i);
+ }
+ return offsets;
+ }
+
+ public int[] getLengths() {
+ int[] lens = new int[this.lengths.size()];
+ for (int i = 0; i < lens.length; i++) {
+ lens[i] = this.lengths.get(i);
+ }
+ return lens;
+ }
+
+ public boolean containsReadPortion(ReadPortion rp) {
+ long end = startOffsetInBlock + readLength;
+ return startOffsetInBlock <= rp.startOffsetInBlock && end >=
+ rp.startOffsetInBlock + rp.readLength;
+ }
+ }
+
+ /**
+ * This class represents result from a striped read request.
+ * If the task was successful or the internal computation failed,
+ * an index is also returned.
+ */
+ public static class StripedReadResult {
+ public static final int SUCCESSFUL = 0x01;
+ public static final int FAILED = 0x02;
+ public static final int TIMEOUT = 0x04;
+ public static final int CANCELLED = 0x08;
+
+ public final int index;
+ public final int state;
+
+ public StripedReadResult(int state) {
+ Preconditions.checkArgument(state == TIMEOUT,
+ "Only timeout result should return negative index.");
+ this.index = -1;
+ this.state = state;
+ }
+
+ public StripedReadResult(int index, int state) {
+ Preconditions.checkArgument(state != TIMEOUT,
+ "Timeout result should return negative index.");
+ this.index = index;
+ this.state = state;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2e70cb9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
index cf84b30..3b5787a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java
@@ -19,7 +19,8 @@ package org.apache.hadoop.hdfs;
import org.junit.Test;
-import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
import static org.junit.Assert.*;
public class TestPlanReadPortions {
@@ -32,13 +33,13 @@ public class TestPlanReadPortions {
private void testPlanReadPortions(int startInBlk, int length,
int bufferOffset, int[] readLengths, int[] offsetsInBlock,
int[][] bufferOffsets, int[][] bufferLengths) {
- ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
+ ReadPortion[] results = StripedBlockUtil.planReadPortions(GROUP_SIZE,
CELLSIZE, startInBlk, length, bufferOffset);
assertEquals(GROUP_SIZE, results.length);
for (int i = 0; i < GROUP_SIZE; i++) {
- assertEquals(readLengths[i], results[i].getReadLength());
- assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
+ assertEquals(readLengths[i], results[i].readLength);
+ assertEquals(offsetsInBlock[i], results[i].startOffsetInBlock);
final int[] bOffsets = results[i].getOffsets();
assertArrayEquals(bufferOffsets[i], bOffsets);
final int[] bLengths = results[i].getLengths();
@@ -47,7 +48,7 @@ public class TestPlanReadPortions {
}
/**
- * Test {@link DFSStripedInputStream#planReadPortions}
+ * Test {@link StripedBlockUtil#planReadPortions}
*/
@Test
public void testPlanReadPortions() {