You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by pg...@apache.org on 2022/01/17 14:49:04 UTC

[orc] branch branch-1.7 updated: ORC-1078: Row group end offset doesn't accommodate all the blocks (#996)

This is an automated email from the ASF dual-hosted git repository.

pgaref pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new ea986a7  ORC-1078: Row group end offset doesn't accommodate all the blocks (#996)
ea986a7 is described below

commit ea986a768fe74dd3fc5587024c44a89428d46ca0
Author: Yu-Wen <hs...@users.noreply.github.com>
AuthorDate: Mon Jan 17 16:25:16 2022 +0200

    ORC-1078: Row group end offset doesn't accommodate all the blocks (#996)
    
    ### What changes were proposed in this pull request?
    To calculate the row group end offset based on the buffer size. Current implementation assumes stretching the slop by a constant factor of 2 to accommodate the compression block but thats not always the case when smaller buffer sizes are used.
    
    ### Why are the changes needed?
    When compression size is smaller than 2048 and two blocks are not enough for all the values left in last row group, we might end up seeking outside of range.
    
    ### How was this patch tested?
    Unit test added TestRecordReaderImpl#testSmallCompressionSizeOrc
    
    Change-Id: Iff6b0468f582f09b1da7120d6154daf6aa7201f1
---
 .../org/apache/orc/impl/RecordReaderUtils.java     | 20 +++++---
 .../org/apache/orc/impl/SerializationUtils.java    |  2 +-
 .../org/apache/orc/impl/TestRecordReaderImpl.java  | 57 +++++++++++++++++++++-
 3 files changed, 70 insertions(+), 9 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index c63cabe..0d32c21 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -180,13 +180,14 @@ public class RecordReaderUtils {
                                          boolean isLast,
                                          long nextGroupOffset,
                                          long streamLength) {
-    // figure out the worst case last location
-    // if adjacent groups have the same compressed block offset then stretch the slop
-    // by factor of 2 to safely accommodate the next compression block.
-    // One for the current compression block and another for the next compression block.
-    long slop = isCompressed
-                    ? 2 * (OutStream.HEADER_SIZE + bufferSize)
-                    : WORST_UNCOMPRESSED_SLOP;
+    // Figure out the worst case last location
+    long slop = WORST_UNCOMPRESSED_SLOP;
+    // Stretch the slop by a factor to safely accommodate following compression blocks.
+    // We need to calculate the maximum number of blocks(stretchFactor) by bufferSize accordingly.
+    if (isCompressed) {
+      int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize;
+      slop = stretchFactor * (OutStream.HEADER_SIZE + bufferSize);
+    }
     return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
   }
 
@@ -262,6 +263,11 @@ public class RecordReaderUtils {
   // for uncompressed streams, what is the most overlap with the following set
   // of rows (long vint literal group).
   static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+  // the maximum number of values that need to be consumed from the run
+  static final int MAX_VALUES_LENGTH = RunLengthIntegerWriterV2.MAX_SCOPE;
+  // the maximum byte width for each value
+  static final int MAX_BYTE_WIDTH =
+      SerializationUtils.decodeBitWidth(SerializationUtils.FixedBitSizes.SIXTYFOUR.ordinal()) / 8;
 
   /**
    * Is this stream part of a dictionary?
diff --git a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
index 1b41041..f4ceb92 100644
--- a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
@@ -434,7 +434,7 @@ public final class SerializationUtils {
    * @param n - encoded fixed bit width
    * @return decoded fixed bit width
    */
-  public int decodeBitWidth(int n) {
+  public static int decodeBitWidth(int n) {
     if (n >= FixedBitSizes.ONE.ordinal()
         && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
       return n + 1;
diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
index 93a55b2..e1ce5c8 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
@@ -40,6 +40,7 @@ import org.apache.orc.ColumnStatistics;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.DataReader;
+import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
 import org.apache.orc.Reader;
@@ -78,6 +79,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.TimeZone;
 
+import static org.apache.orc.impl.RecordReaderUtils.MAX_BYTE_WIDTH;
+import static org.apache.orc.impl.RecordReaderUtils.MAX_VALUES_LENGTH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -1538,7 +1541,8 @@ public class TestRecordReaderImpl {
         new InStream.StreamOptions()
             .withCodec(OrcCodecPool.getCodec(CompressionKind.ZLIB))
             .withBufferSize(1024);
-    final int SLOP = 2 * (OutStream.HEADER_SIZE + options.getBufferSize());
+    int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / options.getBufferSize();
+    final int SLOP = stretchFactor * (OutStream.HEADER_SIZE + options.getBufferSize());
     MockDataReader dataReader = new MockDataReader(schema, options)
       .addStream(1, OrcProto.Stream.Kind.ROW_INDEX,
           createRowIndex(options,
@@ -2449,4 +2453,55 @@ public class TestRecordReaderImpl {
     f2.setAccessible(true);
     assertFalse((boolean)f2.get(applier1));
   }
+
+  @Test
+  public void testRgEndOffset() throws IOException {
+    for (int compressionSize = 64; compressionSize < 4096; compressionSize *= 2) {
+      testSmallCompressionSizeOrc(compressionSize);
+    }
+  }
+
+  private void testSmallCompressionSizeOrc(int compressionSize) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path(workDir, "smallCompressionSize.orc");
+    FileSystem.get(conf).delete(path, true);
+
+    TypeDescription schema = TypeDescription.fromString("struct<x:int>");
+    conf.setLong(OrcConf.BUFFER_SIZE.getAttribute(), compressionSize);
+    OrcFile.WriterOptions options = OrcFile.writerOptions(conf).setSchema(schema);
+    Writer writer = OrcFile.createWriter(path, options);
+    VectorizedRowBatch writeBatch = schema.createRowBatch();
+    LongColumnVector writeX = (LongColumnVector) writeBatch.cols[0];
+    for (int row = 0; row < 30_000; ++row) {
+      int idx = writeBatch.size++;
+      writeX.vector[idx] = row >= 10_000 && row < 20_000 ? row + 100_000 : row;
+      if (writeBatch.size == writeBatch.getMaxSize()) {
+        writer.addRowBatch(writeBatch);
+        writeBatch.reset();
+      }
+    }
+    if (writeBatch.size != 0) {
+      writer.addRowBatch(writeBatch);
+    }
+    writer.close();
+
+    Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    // only the second row group will be selected
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+        .startNot()
+        .lessThan("x", PredicateLeaf.Type.LONG, 100000L)
+        .end().build();
+    VectorizedRowBatch batch = reader.getSchema().createRowBatch();
+    LongColumnVector readX = (LongColumnVector) batch.cols[0];
+    try (RecordReader rows = reader.rows(reader.options().searchArgument(sarg, null))) {
+      int row = 10_000;
+      while (rows.nextBatch(batch)) {
+        for (int i = 0; i < batch.size; i++) {
+          final int current_row = row++;
+          final int expectedVal = current_row >= 10_000 && current_row < 20_000 ? current_row + 100_000 : current_row;
+          assertEquals(expectedVal, readX.vector[i]);
+        }
+      }
+    }
+  }
 }