You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/08/09 17:36:41 UTC
[21/51] [abbrv] hadoop git commit: HADOOP-14722. Azure:
BlockBlobInputStream position incorrect after seek. Contributed by Thomas
Marquardt
HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek.
Contributed by Thomas Marquardt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d91b7a84
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d91b7a84
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d91b7a84
Branch: refs/heads/HADOOP-13345
Commit: d91b7a8451489f97bdde928cea774764155cfe03
Parents: 024c3ec
Author: Steve Loughran <st...@apache.org>
Authored: Sun Aug 6 20:19:23 2017 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Sun Aug 6 20:19:23 2017 +0100
----------------------------------------------------------------------
.../hadoop/fs/azure/BlockBlobInputStream.java | 91 +++++++++++++++-----
.../fs/azure/TestBlockBlobInputStream.java | 85 ++++++++++++++++--
2 files changed, 150 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91b7a84/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
index 5542415..c37b2be 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
@@ -43,11 +43,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
private InputStream blobInputStream = null;
private int minimumReadSizeInBytes = 0;
private long streamPositionAfterLastRead = -1;
+ // position of next network read within stream
private long streamPosition = 0;
+ // length of stream
private long streamLength = 0;
private boolean closed = false;
+ // internal buffer, re-used for performance optimization
private byte[] streamBuffer;
+ // zero-based offset within streamBuffer of current read position
private int streamBufferPosition;
+ // length of data written to streamBuffer, streamBuffer may be larger
private int streamBufferLength;
/**
@@ -82,6 +87,16 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
}
/**
+ * Reset the internal stream buffer but do not release the memory.
+ * The buffer can be reused to avoid frequent memory allocations of
+ * a large buffer.
+ */
+ private void resetStreamBuffer() {
+ streamBufferPosition = 0;
+ streamBufferLength = 0;
+ }
+
+ /**
* Gets the read position of the stream.
* @return the zero-based byte offset of the read position.
* @throws IOException IO failure
@@ -89,7 +104,9 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
@Override
public synchronized long getPos() throws IOException {
checkState();
- return streamPosition;
+ return (streamBuffer != null)
+ ? streamPosition - streamBufferLength + streamBufferPosition
+ : streamPosition;
}
/**
@@ -107,21 +124,39 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
throw new EOFException(
FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
}
- if (pos == getPos()) {
+
+ // calculate offset between the target and current position in the stream
+ long offset = pos - getPos();
+
+ if (offset == 0) {
// no=op, no state change
return;
}
+ if (offset > 0) {
+ // forward seek, data can be skipped as an optimization
+ if (skip(offset) != offset) {
+ throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+ }
+ return;
+ }
+
+ // reverse seek, offset is negative
if (streamBuffer != null) {
- long offset = streamPosition - pos;
- if (offset > 0 && offset < streamBufferLength) {
- streamBufferPosition = streamBufferLength - (int) offset;
+ if (streamBufferPosition + offset >= 0) {
+ // target position is inside the stream buffer,
+ // only need to move backwards within the stream buffer
+ streamBufferPosition += offset;
} else {
- streamBufferPosition = streamBufferLength;
+ // target position is outside the stream buffer,
+ // need to reset stream buffer and move position for next network read
+ resetStreamBuffer();
+ streamPosition = pos;
}
+ } else {
+ streamPosition = pos;
}
- streamPosition = pos;
// close BlobInputStream after seek is invoked because BlobInputStream
// does not support seek
closeBlobInputStream();
@@ -189,8 +224,7 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
streamBuffer = new byte[(int) Math.min(minimumReadSizeInBytes,
streamLength)];
}
- streamBufferPosition = 0;
- streamBufferLength = 0;
+ resetStreamBuffer();
outputStream = new MemoryOutputStream(streamBuffer, streamBufferPosition,
streamBuffer.length);
needToCopy = true;
@@ -295,27 +329,44 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
* @throws IOException IO failure
+ * @throws IndexOutOfBoundsException if n is negative or if the sum of n
+ * and the current value of getPos() is greater than the length of the stream.
*/
@Override
public synchronized long skip(long n) throws IOException {
checkState();
if (blobInputStream != null) {
- return blobInputStream.skip(n);
- } else {
- if (n < 0 || streamPosition + n > streamLength) {
- throw new IndexOutOfBoundsException("skip range");
- }
+ // blobInput stream is open; delegate the work to it
+ long skipped = blobInputStream.skip(n);
+ // update position to the actual skip value
+ streamPosition += skipped;
+ return skipped;
+ }
- if (streamBuffer != null) {
- streamBufferPosition = (n < streamBufferLength - streamBufferPosition)
- ? streamBufferPosition + (int) n
- : streamBufferLength;
- }
+ // no blob stream; implement the skip logic directly
+ if (n < 0 || n > streamLength - getPos()) {
+ throw new IndexOutOfBoundsException("skip range");
+ }
+ if (streamBuffer != null) {
+ // there's a buffer, so seek with it
+ if (n < streamBufferLength - streamBufferPosition) {
+ // new range is in the buffer, so just update the buffer position
+ // skip within the buffer.
+ streamBufferPosition += (int) n;
+ } else {
+ // skip is out of range, so move position to ne value and reset
+ // the buffer ready for the next read()
+ streamPosition = getPos() + n;
+ resetStreamBuffer();
+ }
+ } else {
+ // no stream buffer; increment the stream position ready for
+ // the next triggered connection & read
streamPosition += n;
- return n;
}
+ return n;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91b7a84/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
index 2453584..0ae4012 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
@@ -155,7 +155,7 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
}
LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
- TEST_FILE_SIZE );
+ TEST_FILE_SIZE);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
@@ -198,7 +198,7 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
}
@Test
- public void test_0200_BasicReadTestV2() throws Exception {
+ public void test_0200_BasicReadTest() throws Exception {
assumeHugeFileExists();
try (
@@ -214,12 +214,12 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
// v1 forward seek and read a kilobyte into first kilobyte of bufferV1
inputStreamV1.seek(5 * MEGABYTE);
int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE);
- assertEquals(numBytesReadV1, KILOBYTE);
+ assertEquals(KILOBYTE, numBytesReadV1);
// v2 forward seek and read a kilobyte into first kilobyte of bufferV2
inputStreamV2.seek(5 * MEGABYTE);
int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE);
- assertEquals(numBytesReadV2, KILOBYTE);
+ assertEquals(KILOBYTE, numBytesReadV2);
assertArrayEquals(bufferV1, bufferV2);
@@ -229,17 +229,90 @@ public class TestBlockBlobInputStream extends AbstractWasbTestBase {
// v1 reverse seek and read a megabyte into last megabyte of bufferV1
inputStreamV1.seek(3 * MEGABYTE);
numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len);
- assertEquals(numBytesReadV1, len);
+ assertEquals(len, numBytesReadV1);
// v2 reverse seek and read a megabyte into last megabyte of bufferV2
inputStreamV2.seek(3 * MEGABYTE);
numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len);
- assertEquals(numBytesReadV2, len);
+ assertEquals(len, numBytesReadV2);
assertArrayEquals(bufferV1, bufferV2);
}
}
+ @Test
+ public void test_0201_RandomReadTest() throws Exception {
+ assumeHugeFileExists();
+
+ try (
+ FSDataInputStream inputStreamV1
+ = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
+
+ FSDataInputStream inputStreamV2
+ = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
+ ) {
+ final int bufferSize = 4 * KILOBYTE;
+ byte[] bufferV1 = new byte[bufferSize];
+ byte[] bufferV2 = new byte[bufferV1.length];
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ inputStreamV1.seek(0);
+ inputStreamV2.seek(0);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ int seekPosition = 2 * KILOBYTE;
+ inputStreamV1.seek(seekPosition);
+ inputStreamV2.seek(seekPosition);
+
+ inputStreamV1.seek(0);
+ inputStreamV2.seek(0);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ seekPosition = 5 * KILOBYTE;
+ inputStreamV1.seek(seekPosition);
+ inputStreamV2.seek(seekPosition);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ seekPosition = 10 * KILOBYTE;
+ inputStreamV1.seek(seekPosition);
+ inputStreamV2.seek(seekPosition);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+ seekPosition = 4100 * KILOBYTE;
+ inputStreamV1.seek(seekPosition);
+ inputStreamV2.seek(seekPosition);
+
+ verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+ }
+ }
+
+ private void verifyConsistentReads(FSDataInputStream inputStreamV1,
+ FSDataInputStream inputStreamV2,
+ byte[] bufferV1,
+ byte[] bufferV2) throws IOException {
+ int size = bufferV1.length;
+ final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size);
+ assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
+
+ final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size);
+ assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
+
+ assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
+ }
+
/**
* Validates the implementation of InputStream.markSupported.
* @throws IOException
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org