You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/11 21:23:48 UTC
[48/50] hadoop git commit: HDFS-8203. Erasure Coding: Seek and other
Ops in DFSStripedInputStream. Contributed by Yi Liu.
HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream. Contributed by Yi Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de30e663
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de30e663
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de30e663
Branch: refs/heads/HDFS-7285
Commit: de30e663231de504303e4b2ba4aef32ee06b753b
Parents: 49d0ac8
Author: Jing Zhao <ji...@apache.org>
Authored: Thu May 7 11:06:40 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 11 11:40:59 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../hadoop/hdfs/DFSStripedInputStream.java | 88 +++++++++++++++++---
.../hadoop/hdfs/TestWriteReadStripedFile.java | 83 +++++++++++++++---
3 files changed, 151 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de30e663/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 11e8376..fed08e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -186,3 +186,6 @@
HDFS-8129. Erasure Coding: Maintain consistent naming for Erasure Coding related classes - EC/ErasureCoding
(umamahesh)
+
+ HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream.
+ (Yi Liu via jing9)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de30e663/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 7cb7b6d..9011192 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.ByteBufferPool;
+
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
@@ -31,9 +34,11 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
+import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
@@ -263,6 +268,10 @@ public class DFSStripedInputStream extends DFSInputStream {
}
private long getOffsetInBlockGroup() {
+ return getOffsetInBlockGroup(pos);
+ }
+
+ private long getOffsetInBlockGroup(long pos) {
return pos - currentLocatedBlock.getStartOffset();
}
@@ -278,18 +287,22 @@ public class DFSStripedInputStream extends DFSInputStream {
// compute stripe range based on pos
final long offsetInBlockGroup = getOffsetInBlockGroup();
final long stripeLen = cellSize * dataBlkNum;
- int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
- curStripeRange = new StripeRange(stripeIndex * stripeLen,
- Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen),
- stripeLen));
- final int numCell = (int) ((curStripeRange.length - 1) / cellSize + 1);
+ final int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
+ final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
+ final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
+ - (stripeIndex * stripeLen), stripeLen);
+ curStripeRange = new StripeRange(offsetInBlockGroup,
+ stripeLimit - stripeBufOffset);
+
+ final int startCell = stripeBufOffset / cellSize;
+ final int numCell = (stripeLimit - 1) / cellSize + 1;
// read the whole stripe in parallel
Map<Future<Integer>, Integer> futures = new HashMap<>();
- for (int i = 0; i < numCell; i++) {
- curStripeBuf.position(cellSize * i);
- curStripeBuf.limit((int) Math.min(cellSize * (i + 1),
- curStripeRange.length));
+ for (int i = startCell; i < numCell; i++) {
+ int bufPos = i == startCell ? stripeBufOffset : cellSize * i;
+ curStripeBuf.position(bufPos);
+ curStripeBuf.limit(Math.min(cellSize * (i + 1), stripeLimit));
ByteBuffer buf = curStripeBuf.slice();
ByteBufferStrategy strategy = new ByteBufferStrategy(buf);
final int targetLength = buf.remaining();
@@ -329,6 +342,39 @@ public class DFSStripedInputStream extends DFSInputStream {
};
}
+ /**
+ * Seek to a new arbitrary location
+ */
+ @Override
+ public synchronized void seek(long targetPos) throws IOException {
+ if (targetPos > getFileLength()) {
+ throw new EOFException("Cannot seek after EOF");
+ }
+ if (targetPos < 0) {
+ throw new EOFException("Cannot seek to negative offset");
+ }
+ if (closed.get()) {
+ throw new IOException("Stream is closed!");
+ }
+ if (targetPos <= blockEnd) {
+ final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos);
+ if (curStripeRange.include(targetOffsetInBlk)) {
+ int bufOffset = getStripedBufOffset(targetOffsetInBlk);
+ curStripeBuf.position(bufOffset);
+ pos = targetPos;
+ return;
+ }
+ }
+ pos = targetPos;
+ blockEnd = -1;
+ }
+
+ private int getStripedBufOffset(long offsetInBlockGroup) {
+ final long stripeLen = cellSize * dataBlkNum;
+ // compute the position in the curStripeBuf based on "pos"
+ return (int) (offsetInBlockGroup % stripeLen);
+ }
+
@Override
protected synchronized int readWithStrategy(ReaderStrategy strategy,
int off, int len) throws IOException {
@@ -405,10 +451,8 @@ public class DFSStripedInputStream extends DFSInputStream {
* @return number of bytes copied
*/
private int copy(ReaderStrategy strategy, int offset, int length) {
- final long stripeLen = cellSize * dataBlkNum;
- final long offsetInBlk = pos - currentLocatedBlock.getStartOffset();
- // compute the position in the curStripeBuf based on "pos"
- int bufOffset = (int) (offsetInBlk % stripeLen);
+ final long offsetInBlk = getOffsetInBlockGroup();
+ int bufOffset = getStripedBufOffset(offsetInBlk);
curStripeBuf.position(bufOffset);
return strategy.copyFrom(curStripeBuf, offset,
Math.min(length, curStripeBuf.remaining()));
@@ -546,4 +590,22 @@ public class DFSStripedInputStream extends DFSInputStream {
}
throw new InterruptedException("let's retry");
}
+
+ /**
+ * May need online read recovery, zero-copy read doesn't make
+ * sense, so don't support it.
+ */
+ @Override
+ public synchronized ByteBuffer read(ByteBufferPool bufferPool,
+ int maxLength, EnumSet<ReadOption> opts)
+ throws IOException, UnsupportedOperationException {
+ throw new UnsupportedOperationException(
+ "Not support enhanced byte buffer access.");
+ }
+
+ @Override
+ public synchronized void releaseBuffer(ByteBuffer buffer) {
+ throw new UnsupportedOperationException(
+ "Not support enhanced byte buffer access.");
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de30e663/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
index eacc6ed..5c6f449 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -22,12 +22,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -150,11 +150,35 @@ public class TestWriteReadStripedFile {
return bytes;
}
+ private int readAll(FSDataInputStream in, byte[] buf) throws IOException {
+ int readLen = 0;
+ int ret;
+ do {
+ ret = in.read(buf, readLen, buf.length - readLen);
+ if (ret > 0) {
+ readLen += ret;
+ }
+ } while (ret >= 0 && readLen < buf.length);
+ return readLen;
+ }
+
private byte getByte(long pos) {
final int mod = 29;
return (byte) (pos % mod + 1);
}
+ private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
+ int writeBytes) throws IOException {
+ fsdis.seek(pos);
+ byte[] buf = new byte[writeBytes];
+ int readLen = readAll(fsdis, buf);
+ Assert.assertEquals(readLen, writeBytes - pos);
+ for (int i = 0; i < readLen; i++) {
+ Assert.assertEquals("Byte at " + i + " should be the same",
+ getByte(pos + i), buf[i]);
+ }
+ }
+
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
throws IOException {
Path testPath = new Path(src);
@@ -183,15 +207,7 @@ public class TestWriteReadStripedFile {
// stateful read with byte array
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
byte[] buf = new byte[writeBytes + 100];
- int readLen = 0;
- int ret;
- do {
- ret = fsdis.read(buf, readLen, buf.length - readLen);
- if (ret > 0) {
- readLen += ret;
- }
- } while (ret >= 0);
- readLen = readLen >= 0 ? readLen : 0;
+ int readLen = readAll(fsdis, buf);
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
@@ -200,6 +216,53 @@ public class TestWriteReadStripedFile {
}
}
+ // seek and stateful read
+ try (FSDataInputStream fsdis = fs.open(new Path(src))) {
+ // seek to 1/2 of content
+ int pos = writeBytes/2;
+ assertSeekAndRead(fsdis, pos, writeBytes);
+
+ // seek to 1/3 of content
+ pos = writeBytes/3;
+ assertSeekAndRead(fsdis, pos, writeBytes);
+
+ // seek to 0 pos
+ pos = 0;
+ assertSeekAndRead(fsdis, pos, writeBytes);
+
+ if (writeBytes > cellSize) {
+ // seek to cellSize boundary
+ pos = cellSize -1;
+ assertSeekAndRead(fsdis, pos, writeBytes);
+ }
+
+ if (writeBytes > cellSize * dataBlocks) {
+ // seek to striped cell group boundary
+ pos = cellSize * dataBlocks - 1;
+ assertSeekAndRead(fsdis, pos, writeBytes);
+ }
+
+ if (writeBytes > blockSize * dataBlocks) {
+ // seek to striped block group boundary
+ pos = blockSize * dataBlocks - 1;
+ assertSeekAndRead(fsdis, pos, writeBytes);
+ }
+
+ try {
+ fsdis.seek(-1);
+ Assert.fail("Should be failed if seek to negative offset");
+ } catch (EOFException e) {
+ // expected
+ }
+
+ try {
+ fsdis.seek(writeBytes + 1);
+ Assert.fail("Should be failed if seek after EOF");
+ } catch (EOFException e) {
+ // expected
+ }
+ }
+
// stateful read with ByteBuffer
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);