You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2023/03/09 20:47:24 UTC

[orc] branch branch-1.8 updated: ORC-1393: Add `reset(DiskRangeList input, long length)` to `InStream` impl class

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 21ea24b22 ORC-1393: Add `reset(DiskRangeList input, long length)` to `InStream` impl class
21ea24b22 is described below

commit 21ea24b22fb58e019252a38f6caa51186f43bd24
Author: Dmitriy Fingerman <df...@cloudera.com>
AuthorDate: Thu Mar 9 12:47:09 2023 -0800

    ORC-1393: Add `reset(DiskRangeList input, long length)` to `InStream` impl class
    
    ### What changes were proposed in this pull request?
    
    Adding a possibility to re-set the length of the UncompressedStream when calling reset() on it.
    
    ### Why are the changes needed?
    
    In short, in some cases, after resetting an UncompressedStream, its actual length is longer than its initial length.
    
    Before 'ORC-516 - Update InStream for column compression', InStream.UncompressedStream class had 'length' field and the length was modifiable in reset() method. ORC-516 was the root cause of this regression reported in HIVE-27128.
    
    The `reset()` method was used in `SettableUncompressedStream` class in `setBuffers()` method:
    
    ```java
    public void setBuffers(DiskRangeInfo diskRangeInfo) {
      reset(diskRangeInfo.getDiskRanges(), diskRangeInfo.getTotalLength());
      setOffset(diskRangeInfo.getDiskRanges());
    }
    ```
    After Orc version upgrade in Hive to 1.6.7., and since `SettableUncompressedStream` class was removed from Orc code base, Hive manages it own copy of `SettableUncompressedStream` which doesn't pass new length to `UncompressedStream` when calling reset (because `UncompressedStream` doesn't accept new length any more in the reset method):
    
    ```java
    public void setBuffers(DiskRangeInfo diskRangeList) {
      reset(diskRangeList.getDiskRanges());
      setOffset(diskRangeList.getDiskRanges());
    }
    ```
    When investigating the issue reported in HIVE-27128 and comparing the lengths of the InStream.UncompressedStream prior to the upgrade of ORC version in Hive to 1.6.7. (which included ORC-516) and after I noticed that the issue happens with ORC-516 changes because the length of the InStream.UncompressedStream is set once for all row groups, while without those changes the length is dynamic and sometimes is set to bigger value than the initial value.
    
    ### How was this patch tested?
    
    1. Passing CI pipeline tests
    2. Running Hive Q-test https://github.com/difin/hive/commits/orc_read_err_qtest on the local dev env with a change in Hive's SettableUncompressedStream to pass  diskRangeList.getTotalLength() to the reset method of UncompressedStream.
    
    Closes #1432 from difin/ORC-1393.
    
    Authored-by: Dmitriy Fingerman <df...@cloudera.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit b56d2d123075e01ee97de4da62c49b5f436165a4)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../src/java/org/apache/orc/impl/InStream.java     | 24 +++++++++++-
 .../src/test/org/apache/orc/impl/TestInStream.java | 43 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 2 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
index e6da6d44f..f0d3b1c35 100644
--- a/java/core/src/java/org/apache/orc/impl/InStream.java
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -42,7 +42,7 @@ public abstract class InStream extends InputStream {
 
   protected final Object name;
   protected final long offset;
-  protected final long length;
+  protected long length;
   protected DiskRangeList bytes;
   // position in the stream (0..length)
   protected long position;
@@ -87,6 +87,26 @@ public abstract class InStream extends InputStream {
     setCurrent(input, true);
   }
 
+  /**
+   * Reset the input to a new set of data with a different length.
+   *
+   * in some cases, after resetting an UncompressedStream, its actual length is longer than its initial length.
+   * Prior to ORC-516, InStream.UncompressedStream class had the 'length' field and the length was modifiable in
+   * the reset() method. It was used in SettableUncompressedStream class in setBuffers() method.
+   * SettableUncompressedStream was passing 'diskRangeInfo.getTotalLength()' as the length to the reset() method.
+   * SettableUncompressedStream had been removed from ORC code base, but it is required for Apache Hive and
+   * Apache Hive manages its own copy of SettableUncompressedStream since upgrading its Apache ORC version to 1.6.7.
+   * ORC-516 was the root cause of the regression reported in HIVE-27128 - EOFException when reading DATA stream.
+   * This wrapper method allows to resolve HIVE-27128.
+   *
+   * @param input the input data
+   * @param length new length of the stream
+   */
+  protected void reset(DiskRangeList input, long length) {
+    this.length = length;
+    reset(input);
+  }
+
   public abstract void changeIv(Consumer<byte[]> modifier);
 
   static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
@@ -122,7 +142,7 @@ public abstract class InStream extends InputStream {
                               long offset,
                               long length) {
       super(name, offset, length);
-      reset(input);
+      reset(input, length);
     }
 
     @Override
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index ddf3cdf13..06056a637 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -957,4 +957,47 @@ public class TestInStream {
       assertEquals((byte)i, inBuffer[i], "position " + i);
     }
   }
+
+  private static final byte[] uncompressed = input(
+          0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+  
+  @Test
+  public void testStreamResetWithIncreasedLength() throws IOException {
+    // Set up an initial buffer of PREVIOUS_LENGTH followed by our stream
+    // at START.
+    final long START = 1_000;
+    final int PREVIOUS_LENGTH = 30;
+    BufferChunkList list = new BufferChunkList();
+    byte[] previous = new byte[PREVIOUS_LENGTH];
+    Arrays.fill(previous, (byte) -1);
+    list.add(new BufferChunk(ByteBuffer.wrap(previous), START - PREVIOUS_LENGTH));
+    list.add(new BufferChunk(ByteBuffer.wrap(uncompressed), START));
+    // Creating a stream of 10 bytes, but with a length of 5
+    InStream inStream = InStream.create("test", list.get(), START, 5, new InStream.StreamOptions());
+    // Resetting the stream with the increased length
+    inStream.reset(list.get(), 10);
+    // Reading the stream and expecting to read 10 bytes
+    byte[] inBuffer = new byte[10];
+    assertEquals(10, inStream.read(inBuffer));
+  }
+
+  @Test
+  public void testStreamResetWithoutIncreasedLength() throws IOException {
+    // Set up an initial buffer of PREVIOUS_LENGTH followed by our stream
+    // at START.
+    final long START = 1_000;
+    final int PREVIOUS_LENGTH = 30;
+    BufferChunkList list = new BufferChunkList();
+    byte[] previous = new byte[PREVIOUS_LENGTH];
+    Arrays.fill(previous, (byte) -1);
+    list.add(new BufferChunk(ByteBuffer.wrap(previous), START - PREVIOUS_LENGTH));
+    list.add(new BufferChunk(ByteBuffer.wrap(uncompressed), START));
+    // Creating a stream of 10 bytes, but with a shorter length of 5
+    InStream inStream = InStream.create("test", list.get(), START, 5, new InStream.StreamOptions());
+    // Resetting the stream without updating its length
+    inStream.reset(list.get());
+    // Reading the stream and expecting to read 5 bytes as the initial stream length
+    byte[] inBuffer = new byte[5];
+    assertEquals(5, inStream.read(inBuffer));
+  }
 }