You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2021/03/23 21:23:03 UTC

[orc] branch master updated: ORC-758: Avoid seeking and decompressing of compressed stream, if the required seek is already satisfied by the current decompressed block

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new d68d6db  ORC-758: Avoid seeking and decompressing of compressed stream, if the required seek is already satisfied by the current decompressed block
d68d6db is described below

commit d68d6db859bd930f169f7979b21af47f3405e7c2
Author: Pavan Lanka <pl...@apple.com>
AuthorDate: Thu Mar 11 17:33:33 2021 -0800

    ORC-758: Avoid seeking and decompressing of compressed stream, if the required seek is
    already satisfied by the current decompressed block
    
    Fixes #652
    
    Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
 .../src/java/org/apache/orc/impl/InStream.java     |  50 ++++++---
 .../src/test/org/apache/orc/impl/TestInStream.java | 113 +++++++++++++++++++++
 2 files changed, 151 insertions(+), 12 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
index a374dc2..53c2afa 100644
--- a/java/core/src/java/org/apache/orc/impl/InStream.java
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -397,13 +397,14 @@ public abstract class InStream extends InputStream {
     }
   }
 
-  private static class CompressedStream extends InStream {
+  public static class CompressedStream extends InStream {
     private final int bufferSize;
     private ByteBuffer uncompressed;
     private final CompressionCodec codec;
     protected ByteBuffer compressed;
     protected DiskRangeList currentRange;
     private boolean isUncompressedOriginal;
+    protected long currentCompressedStart = -1;
 
     /**
      * Create the stream without resetting the input stream.
@@ -471,6 +472,7 @@ public abstract class InStream extends InputStream {
     }
 
     private void readHeader() throws IOException {
+      currentCompressedStart = this.position;
       int b0 = readHeaderByte();
       int b1 = readHeaderByte();
       int b2 = readHeaderByte();
@@ -488,11 +490,14 @@ public abstract class InStream extends InputStream {
         isUncompressedOriginal = true;
       } else {
         if (isUncompressedOriginal) {
+          // Since the previous chunk was uncompressed, allocate the buffer and set original false
           allocateForUncompressed(bufferSize, slice.isDirect());
           isUncompressedOriginal = false;
         } else if (uncompressed == null) {
+          // If the buffer was not allocated then allocate the same
           allocateForUncompressed(bufferSize, slice.isDirect());
         } else {
+          // Since the buffer is already allocated just clear the same
           uncompressed.clear();
         }
         codec.decompress(slice, uncompressed);
@@ -551,15 +556,25 @@ public abstract class InStream extends InputStream {
 
     @Override
     public void seek(PositionProvider index) throws IOException {
-      seek(index.getNext());
+      boolean seeked = seek(index.getNext());
       long uncompressedBytes = index.getNext();
-      if (uncompressedBytes != 0) {
-        readHeader();
-        uncompressed.position(uncompressed.position() +
-                              (int) uncompressedBytes);
-      } else if (uncompressed != null) {
-        // mark the uncompressed buffer as done
-        uncompressed.position(uncompressed.limit());
+      if (!seeked) {
+        if (uncompressed != null) {
+          // Only reposition uncompressed
+          uncompressed.position((int) uncompressedBytes);
+        }
+        // uncompressed == null should not happen as !seeked would mean that a previous
+        // readHeader has taken place
+      } else {
+        if (uncompressedBytes != 0) {
+          // Decompress compressed as a seek has taken place and position uncompressed
+          readHeader();
+          uncompressed.position(uncompressed.position() +
+                                (int) uncompressedBytes);
+        } else if (uncompressed != null) {
+          // mark the uncompressed buffer as done
+          uncompressed.position(uncompressed.limit());
+        }
       }
     }
 
@@ -621,9 +636,20 @@ public abstract class InStream extends InputStream {
           chunkLength + " bytes");
     }
 
-    void seek(long desired) throws IOException {
+    /**
+     * Seek to the desired chunk based on the input position.
+     *
+     * @param desired position in the compressed stream
+     * @return Indicates whether a seek was performed or not
+     * @throws IOException when seeking outside the stream bounds
+     */
+    boolean seek(long desired) throws IOException {
       if (desired == 0 && bytes == null) {
-        return;
+        return false;
+      }
+      if (desired == currentCompressedStart) {
+        // Header already at the required position
+        return false;
       }
       long posn = desired + offset;
       for (DiskRangeList range = bytes; range != null; range = range.next) {
@@ -632,7 +658,7 @@ public abstract class InStream extends InputStream {
               posn < range.getEnd())) {
           position = desired;
           setCurrent(range, true);
-          return;
+          return true;
         }
       }
       throw new IOException("Seek outside of data in " + this + " to " + desired);
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index b5accb6..e6b05b5 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -19,6 +19,11 @@
 package org.apache.orc.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.DataInputStream;
@@ -441,6 +446,114 @@ public class TestInStream {
     }
   }
 
+  private long seekPosition(long prevPos,
+                            PositionCollector[] positions,
+                            int posIdx,
+                            InStream in,
+                            boolean needsSeek)
+    throws IOException {
+    if (needsSeek) {
+      assertNotEquals(prevPos, positions[posIdx].getNext());
+    } else {
+      assertEquals(prevPos, positions[posIdx].getNext());
+    }
+    positions[posIdx].reset();
+    ByteBuffer c = ((InStream.CompressedStream) in).compressed;
+    in.seek(positions[posIdx]);
+    assertEquals(posIdx & 0xff, in.read());
+    if (needsSeek) {
+      assertNotSame(c, ((InStream.CompressedStream) in).compressed);
+    } else {
+      assertSame(c, ((InStream.CompressedStream) in).compressed);
+    }
+    positions[posIdx].reset();
+    return positions[posIdx].getNext();
+  }
+
+  @Test
+  public void testCompressedSeeks() throws Exception {
+    // We test two scenarios one where the stream is perfectly aligned with the DiskRange and the
+    // other where it requires an offset
+    for (int offset : new int[]{0, 10}) {
+      int compValues = 1024;
+      int origValues = 100;
+      PositionCollector[] positions = new PositionCollector[compValues + origValues];
+      byte[] compBytes = getCompressed(positions);
+      assertEquals(961, compBytes.length);
+      // Add an original chunk at the end
+      byte[] bytes = new byte[compBytes.length + 3 + origValues + offset];
+      System.arraycopy(compBytes, 0, bytes, offset, compBytes.length);
+      int startPos = offset + compBytes.length;
+      // Write original header
+      bytes[startPos] = (byte) ((origValues << 1) + 1);
+      bytes[startPos + 1] = (byte) (origValues >> 7);
+      bytes[startPos + 2] = (byte) (origValues >> 15);
+      for (int i = 0; i < 100; i++) {
+        positions[compValues + i] = new PositionCollector();
+        positions[compValues + i].addPosition(compBytes.length);
+        positions[compValues + i].addPosition(i);
+        bytes[startPos + 3 + i] = (byte) (compValues + i);
+      }
+      InStream in = InStream.create("test", new BufferChunk(ByteBuffer.wrap(bytes), 0), offset,
+                                    compBytes.length + 3 + origValues,
+                                    InStream.options()
+                                      .withCodec(new ZlibCodec())
+                                      .withBufferSize(300));
+      assertEquals("compressed stream test position: 0 length: 1064 range: 0" +
+                   String.format(" offset: %d limit: %d range 0 = 0 to %d",
+                                 offset,
+                                 bytes.length,
+                                 bytes.length),
+                   in.toString());
+
+      // Position to the last
+      long currPos = positions[positions.length - 1].getNext();
+      positions[positions.length - 1].reset();
+      in.seek(positions[positions.length - 1]);
+
+      // Seek to the first should reposition compressed
+      currPos = seekPosition(currPos, positions, 0, in, true);
+      // Seek to next position should not require a seek
+      currPos = seekPosition(currPos, positions, 1, in, false);
+
+      // Seek to 301 which should require a seek
+      currPos = seekPosition(currPos, positions, 301, in, true);
+      // Seek to next position should not require a seek
+      seekPosition(currPos, positions, 302, in, false);
+
+      // Seek to 601 which should require a seek
+      currPos = seekPosition(currPos, positions, 601, in, true);
+      // Seek to next position should not require a seek
+      seekPosition(currPos, positions, 602, in, false);
+
+      // Seek to 1024 which should seek to original
+      currPos = seekPosition(currPos, positions, 1024, in, true);
+      // Seek to next position should not require a seek
+      seekPosition(currPos, positions, 1025, in, false);
+      seekPosition(currPos, positions, 1026, in, false);
+    }
+  }
+
+  @Test
+  public void testInvalidSeek() throws Exception {
+    PositionCollector[] positions = new PositionCollector[1024];
+    byte[] bytes = getCompressed(positions);
+
+    assertEquals(961, bytes.length);
+    InStream in = InStream.create("test", new BufferChunk(ByteBuffer.wrap(bytes), 0), 0,
+                                  bytes.length, InStream.options().withCodec(new ZlibCodec()).withBufferSize(300));
+    assertEquals("compressed stream test position: 0 length: 961 range: 0" +
+                 " offset: 0 limit: 961 range 0 = 0 to 961",
+                 in.toString());
+
+    PositionCollector invalidPosition = new PositionCollector();
+    invalidPosition.addPosition(-1);
+    invalidPosition.addPosition(0);
+    in.seek(invalidPosition);
+    assertEquals(0, in.read());
+    assertEquals(1, in.read());
+  }
+
   @Test
   public void testCompressedPartial() throws Exception {
     PositionCollector[] positions = new PositionCollector[1024];