You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/05/28 19:59:22 UTC

[hudi] branch master updated: [HUDI-786] Fixing read beyond inline length in InlineFS (#1616)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a0d3f1  [HUDI-786] Fixing read beyond inline length in InlineFS (#1616)
5a0d3f1 is described below

commit 5a0d3f1cf963e0061364d915ac86a465dd079bac
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Thu May 28 15:59:11 2020 -0400

    [HUDI-786] Fixing read beyond inline length in InlineFS (#1616)
---
 .../common/fs/inline/InLineFsDataInputStream.java  | 26 +++++++++--
 .../common/fs/inline/TestInLineFileSystem.java     | 52 +++++++++++-----------
 2 files changed, 49 insertions(+), 29 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java
index 1d316cd..4e87012 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java
@@ -37,15 +37,19 @@ public class InLineFsDataInputStream extends FSDataInputStream {
   private final FSDataInputStream outerStream;
   private final int length;
 
-  public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) {
+  public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) throws IOException {
     super(outerStream.getWrappedStream());
     this.startOffset = startOffset;
     this.outerStream = outerStream;
     this.length = length;
+    outerStream.seek(startOffset);
   }
 
   @Override
   public void seek(long desired) throws IOException {
+    if (desired > length) {
+      throw new IOException("Attempting to seek past inline content");
+    }
     outerStream.seek(startOffset + desired);
   }
 
@@ -56,24 +60,32 @@ public class InLineFsDataInputStream extends FSDataInputStream {
 
   @Override
   public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+    if ((length + offset) > this.length) {
+      throw new IOException("Attempting to read past inline content");
+    }
     return outerStream.read(startOffset + position, buffer, offset, length);
   }
 
   @Override
   public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+    if ((length + offset) > this.length) {
+      throw new IOException("Attempting to read past inline content");
+    }
     outerStream.readFully(startOffset + position, buffer, offset, length);
   }
 
   @Override
   public void readFully(long position, byte[] buffer)
       throws IOException {
-    outerStream.readFully(startOffset + position, buffer, 0, buffer.length);
+    readFully(position, buffer, 0, buffer.length);
   }
 
   @Override
   public boolean seekToNewSource(long targetPos) throws IOException {
-    boolean toReturn = outerStream.seekToNewSource(startOffset + targetPos);
-    return toReturn;
+    if (targetPos > this.length) {
+      throw new IOException("Attempting to seek past inline content");
+    }
+    return outerStream.seekToNewSource(startOffset + targetPos);
   }
 
   @Override
@@ -88,6 +100,9 @@ public class InLineFsDataInputStream extends FSDataInputStream {
 
   @Override
   public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
+    if (readahead > this.length) {
+      throw new IOException("Attempting to set read ahead past inline content");
+    }
     outerStream.setReadahead(readahead);
   }
 
@@ -99,6 +114,9 @@ public class InLineFsDataInputStream extends FSDataInputStream {
   @Override
   public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
       throws IOException, UnsupportedOperationException {
+    if (maxLength > this.length) {
+      throw new IOException("Attempting to read max length beyond inline content");
+    }
     return outerStream.read(bufferPool, maxLength, opts);
   }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
index 32394cb..4553aa5 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -122,7 +121,6 @@ public class TestInLineFileSystem {
   }
 
   @Test
-  @Disabled("Disabling flaky test for now https://issues.apache.org/jira/browse/HUDI-786")
   public void testFileSystemApis() throws IOException {
     OuterPathInfo outerPathInfo = generateOuterFileAndGetInfo(1000);
     Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPathInfo.outerPath, outerPathInfo.startOffset, outerPathInfo.length);
@@ -130,10 +128,22 @@ public class TestInLineFileSystem {
     final FSDataInputStream fsDataInputStream = inlineFileSystem.open(inlinePath);
     byte[] actualBytes = new byte[outerPathInfo.expectedBytes.length];
     // verify pos
-    assertEquals(0 - outerPathInfo.startOffset, fsDataInputStream.getPos());
+    assertEquals(0, fsDataInputStream.getPos());
     fsDataInputStream.readFully(0, actualBytes);
     assertArrayEquals(outerPathInfo.expectedBytes, actualBytes);
 
+    // test seek
+    int[] validPositions = {1, 100, 290, 520, 990, 999, 1000};
+    for (int pos : validPositions) {
+      fsDataInputStream.seek(pos);
+    }
+    int[] invalidPositions = {1001, 1100, 10000};
+    for (int pos : invalidPositions) {
+      assertThrows(IOException.class, () -> {
+        fsDataInputStream.seek(pos);
+      }, "Should have thrown IOException");
+    }
+
     // read partial data
     // test read(long position, byte[] buffer, int offset, int length)
     actualBytes = new byte[100];
@@ -143,9 +153,12 @@ public class TestInLineFileSystem {
     fsDataInputStream.read(25, actualBytes, 100, 210);
     verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210);
     // give length to read > than actual inline content
-    assertThrows(IndexOutOfBoundsException.class, () -> {
+    assertThrows(IOException.class, () -> {
       fsDataInputStream.read(0, new byte[1100], 0, 1101);
-    }, "Should have thrown IndexOutOfBoundsException");
+    }, "Should have thrown IOException");
+    assertThrows(IOException.class, () -> {
+      fsDataInputStream.read(0, new byte[10], 991, 10);
+    }, "Should have thrown IOException");
 
     // test readFully(long position, byte[] buffer, int offset, int length)
     actualBytes = new byte[100];
@@ -155,9 +168,12 @@ public class TestInLineFileSystem {
     fsDataInputStream.readFully(25, actualBytes, 100, 210);
     verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 100, 210);
     // give length to read > than actual inline content
-    assertThrows(IndexOutOfBoundsException.class, () -> {
+    assertThrows(IOException.class, () -> {
       fsDataInputStream.readFully(0, new byte[1100], 0, 1101);
-    }, "Should have thrown IndexOutOfBoundsException");
+    }, "Should have thrown IOException");
+    assertThrows(IOException.class, () -> {
+      fsDataInputStream.readFully(0, new byte[100], 910, 100);
+    }, "Should have thrown IOException");
 
     // test readFully(long position, byte[] buffer)
     actualBytes = new byte[100];
@@ -167,19 +183,10 @@ public class TestInLineFileSystem {
     fsDataInputStream.readFully(25, actualBytes);
     verifyArrayEquality(outerPathInfo.expectedBytes, 25, 210, actualBytes, 0, 210);
     // give length to read > than actual inline content
-    actualBytes = new byte[1100];
-    fsDataInputStream.readFully(0, actualBytes);
-    verifyArrayEquality(outerPathInfo.expectedBytes, 0, 1000, actualBytes, 0, 1000);
-
-    // TODO. seek does not move the position. need to investigate.
-    // test seekToNewSource(long targetPos)
-    /* fsDataInputStream.seekToNewSource(75);
-    Assert.assertEquals(outerPathInfo.startOffset + 75, fsDataInputStream.getPos());
-    fsDataInputStream.seekToNewSource(180);
-    Assert.assertEquals(outerPathInfo.startOffset + 180, fsDataInputStream.getPos());
-    fsDataInputStream.seekToNewSource(910);
-    Assert.assertEquals(outerPathInfo.startOffset + 910, fsDataInputStream.getPos());
-    */
+    assertThrows(IOException.class, () -> {
+      fsDataInputStream.readFully(0, new byte[1100]);
+    }, "Should have thrown IOException");
+
     // test read(ByteBuffer buf)
     ByteBuffer actualByteBuffer = ByteBuffer.allocate(100);
     assertThrows(UnsupportedOperationException.class, () -> {
@@ -197,11 +204,6 @@ public class TestInLineFileSystem {
       fsDataInputStream.setDropBehind(true);
     }, "Should have thrown exception");
 
-    // yet to test
-    // read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
-    // releaseBuffer(ByteBuffer buffer)
-    // unbuffer()
-
     fsDataInputStream.close();
   }