You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/05/28 19:59:22 UTC
[hudi] branch master updated: [HUDI-786] Fixing read beyond inline
length in InlineFS (#1616)
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5a0d3f1 [HUDI-786] Fixing read beyond inline length in InlineFS (#1616)
5a0d3f1 is described below
commit 5a0d3f1cf963e0061364d915ac86a465dd079bac
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Thu May 28 15:59:11 2020 -0400
[HUDI-786] Fixing read beyond inline length in InlineFS (#1616)
---
.../common/fs/inline/InLineFsDataInputStream.java | 26 +++++++++--
.../common/fs/inline/TestInLineFileSystem.java | 52 +++++++++++-----------
2 files changed, 49 insertions(+), 29 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java
index 1d316cd..4e87012 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java
@@ -37,15 +37,19 @@ public class InLineFsDataInputStream extends FSDataInputStream {
private final FSDataInputStream outerStream;
private final int length;
- public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) {
+ public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) throws IOException {
super(outerStream.getWrappedStream());
this.startOffset = startOffset;
this.outerStream = outerStream;
this.length = length;
+ outerStream.seek(startOffset);
}
@Override
public void seek(long desired) throws IOException {
+ if (desired > length) {
+ throw new IOException("Attempting to seek past inline content");
+ }
outerStream.seek(startOffset + desired);
}
@@ -56,24 +60,32 @@ public class InLineFsDataInputStream extends FSDataInputStream {
@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+ if ((length + offset) > this.length) {
+ throw new IOException("Attempting to read past inline content");
+ }
return outerStream.read(startOffset + position, buffer, offset, length);
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+ if ((length + offset) > this.length) {
+ throw new IOException("Attempting to read past inline content");
+ }
outerStream.readFully(startOffset + position, buffer, offset, length);
}
@Override
public void readFully(long position, byte[] buffer)
throws IOException {
- outerStream.readFully(startOffset + position, buffer, 0, buffer.length);
+ readFully(position, buffer, 0, buffer.length);
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
- boolean toReturn = outerStream.seekToNewSource(startOffset + targetPos);
- return toReturn;
+ if (targetPos > this.length) {
+ throw new IOException("Attempting to seek past inline content");
+ }
+ return outerStream.seekToNewSource(startOffset + targetPos);
}
@Override
@@ -88,6 +100,9 @@ public class InLineFsDataInputStream extends FSDataInputStream {
@Override
public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
+ if (readahead > this.length) {
+ throw new IOException("Attempting to set read ahead past inline content");
+ }
outerStream.setReadahead(readahead);
}
@@ -99,6 +114,9 @@ public class InLineFsDataInputStream extends FSDataInputStream {
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
+ if (maxLength > this.length) {
+ throw new IOException("Attempting to read max length beyond inline content");
+ }
return outerStream.read(bufferPool, maxLength, opts);
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
index 32394cb..4553aa5 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -122,7 +121,6 @@ public class TestInLineFileSystem {
}
@Test
- @Disabled("Disabling flaky test for now https://issues.apache.org/jira/browse/HUDI-786")
public void testFileSystemApis() throws IOException {
OuterPathInfo outerPathInfo = generateOuterFileAndGetInfo(1000);
Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPathInfo.outerPath, outerPathInfo.startOffset, outerPathInfo.length);
@@ -130,10 +128,22 @@ public class TestInLineFileSystem {
final FSDataInputStream fsDataInputStream = inlineFileSystem.open(inlinePath);
byte[] actualBytes = new byte[outerPathInfo.expectedBytes.length];
// verify pos
- assertEquals(0 - outerPathInfo.startOffset, fsDataInputStream.getPos());
+ assertEquals(0, fsDataInputStream.getPos());
fsDataInputStream.readFully(0, actualBytes);
assertArrayEquals(outerPathInfo.expectedBytes, actualBytes);
+ // test seek
+ int[] validPositions = {1, 100, 290, 520, 990, 999, 1000};
+ for (int pos : validPositions) {
+ fsDataInputStream.seek(pos);
+ }
+ int[] invalidPositions = {1001, 1100, 10000};
+ for (int pos : invalidPositions) {
+ assertThrows(IOException.class, () -> {
+ fsDataInputStream.seek(pos);
+ }, "Should have thrown IOException");
+ }
+
// read partial data
// test read(long position, byte[] buffer, int offset, int length)
actualBytes = new byte[100];
@@ -143,9 +153,12 @@ public class TestInLineFileSystem {
fsDataInputStream.read(25, actualBytes, 100, 210);
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210);
// give length to read > than actual inline content
- assertThrows(IndexOutOfBoundsException.class, () -> {
+ assertThrows(IOException.class, () -> {
fsDataInputStream.read(0, new byte[1100], 0, 1101);
- }, "Should have thrown IndexOutOfBoundsException");
+ }, "Should have thrown IOException");
+ assertThrows(IOException.class, () -> {
+ fsDataInputStream.read(0, new byte[10], 991, 10);
+ }, "Should have thrown IOException");
// test readFully(long position, byte[] buffer, int offset, int length)
actualBytes = new byte[100];
@@ -155,9 +168,12 @@ public class TestInLineFileSystem {
fsDataInputStream.readFully(25, actualBytes, 100, 210);
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210);
// give length to read > than actual inline content
- assertThrows(IndexOutOfBoundsException.class, () -> {
+ assertThrows(IOException.class, () -> {
fsDataInputStream.readFully(0, new byte[1100], 0, 1101);
- }, "Should have thrown IndexOutOfBoundsException");
+ }, "Should have thrown IOException");
+ assertThrows(IOException.class, () -> {
+ fsDataInputStream.readFully(0, new byte[100], 910, 100);
+ }, "Should have thrown IOException");
// test readFully(long position, byte[] buffer)
actualBytes = new byte[100];
@@ -167,19 +183,10 @@ public class TestInLineFileSystem {
fsDataInputStream.readFully(25, actualBytes);
verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 0, 210);
// give length to read > than actual inline content
- actualBytes = new byte[1100];
- fsDataInputStream.readFully(0, actualBytes);
- verifyArrayEquality(outerPathInfo.expectedBytes, 0, 1000, actualBytes, 0, 1000);
-
- // TODO. seek does not move the position. need to investigate.
- // test seekToNewSource(long targetPos)
- /* fsDataInputStream.seekToNewSource(75);
- Assert.assertEquals(outerPathInfo.startOffset + 75, fsDataInputStream.getPos());
- fsDataInputStream.seekToNewSource(180);
- Assert.assertEquals(outerPathInfo.startOffset + 180, fsDataInputStream.getPos());
- fsDataInputStream.seekToNewSource(910);
- Assert.assertEquals(outerPathInfo.startOffset + 910, fsDataInputStream.getPos());
- */
+ assertThrows(IOException.class, () -> {
+ fsDataInputStream.readFully(0, new byte[1100]);
+ }, "Should have thrown IOException");
+
// test read(ByteBuffer buf)
ByteBuffer actualByteBuffer = ByteBuffer.allocate(100);
assertThrows(UnsupportedOperationException.class, () -> {
@@ -197,11 +204,6 @@ public class TestInLineFileSystem {
fsDataInputStream.setDropBehind(true);
}, "Should have thrown exception");
- // yet to test
- // read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
- // releaseBuffer(ByteBuffer buffer)
- // unbuffer()
-
fsDataInputStream.close();
}