You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by pg...@apache.org on 2022/01/17 14:25:48 UTC

[orc] branch main updated: ORC-1078: Row group end offset doesn't accommodate all the blocks (#996)

This is an automated email from the ASF dual-hosted git repository.

pgaref pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 53e722f  ORC-1078: Row group end offset doesn't accommodate all the blocks (#996)
53e722f is described below

commit 53e722f4c6582a5be7b9b6ff19c94a32da16fa6e
Author: Yu-Wen <hs...@users.noreply.github.com>
AuthorDate: Mon Jan 17 06:25:16 2022 -0800

    ORC-1078: Row group end offset doesn't accommodate all the blocks (#996)
    
    ### What changes were proposed in this pull request?
    To calculate the row group end offset based on the buffer size. Current implementation assumes stretching the slop by a constant factor of 2 to accommodate the compression block but thats not always the case when smaller buffer sizes are used.
    
    ### Why are the changes needed?
    When compression size is smaller than 2048 and two blocks are not enough for all the values left in last row group, we might end up seeking outside of range.
    
    ### How was this patch tested?
    Unit test added TestRecordReaderImpl#testSmallCompressionSizeOrc
---
 .../org/apache/orc/impl/RecordReaderUtils.java     | 20 +++++---
 .../org/apache/orc/impl/SerializationUtils.java    |  2 +-
 .../org/apache/orc/impl/TestRecordReaderImpl.java  | 57 +++++++++++++++++++++-
 3 files changed, 70 insertions(+), 9 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index afdbde6..a39e1ed 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -180,13 +180,14 @@ public class RecordReaderUtils {
                                          boolean isLast,
                                          long nextGroupOffset,
                                          long streamLength) {
-    // figure out the worst case last location
-    // if adjacent groups have the same compressed block offset then stretch the slop
-    // by factor of 2 to safely accommodate the next compression block.
-    // One for the current compression block and another for the next compression block.
-    long slop = isCompressed
-                    ? 2 * (OutStream.HEADER_SIZE + bufferSize)
-                    : WORST_UNCOMPRESSED_SLOP;
+    // Figure out the worst case last location
+    long slop = WORST_UNCOMPRESSED_SLOP;
+    // Stretch the slop by a factor to safely accommodate following compression blocks.
+    // We need to calculate the maximum number of blocks(stretchFactor) by bufferSize accordingly.
+    if (isCompressed) {
+      int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize;
+      slop = stretchFactor * (OutStream.HEADER_SIZE + bufferSize);
+    }
     return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
   }
 
@@ -262,6 +263,11 @@ public class RecordReaderUtils {
   // for uncompressed streams, what is the most overlap with the following set
   // of rows (long vint literal group).
   static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+  // the maximum number of values that need to be consumed from the run
+  static final int MAX_VALUES_LENGTH = RunLengthIntegerWriterV2.MAX_SCOPE;
+  // the maximum byte width for each value
+  static final int MAX_BYTE_WIDTH =
+      SerializationUtils.decodeBitWidth(SerializationUtils.FixedBitSizes.SIXTYFOUR.ordinal()) / 8;
 
   /**
    * Is this stream part of a dictionary?
diff --git a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
index 262a1e5..57fcd19 100644
--- a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
@@ -458,7 +458,7 @@ public final class SerializationUtils {
    * @param n - encoded fixed bit width
    * @return decoded fixed bit width
    */
-  public int decodeBitWidth(int n) {
+  public static int decodeBitWidth(int n) {
     if (n >= FixedBitSizes.ONE.ordinal()
         && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
       return n + 1;
diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
index c9cdc1c..5e3c635 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
@@ -40,6 +40,7 @@ import org.apache.orc.ColumnStatistics;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.DataReader;
+import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
 import org.apache.orc.Reader;
@@ -78,6 +79,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.TimeZone;
 
+import static org.apache.orc.impl.RecordReaderUtils.MAX_BYTE_WIDTH;
+import static org.apache.orc.impl.RecordReaderUtils.MAX_VALUES_LENGTH;
 import static org.apache.orc.OrcFile.CURRENT_WRITER;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -1539,7 +1542,8 @@ public class TestRecordReaderImpl {
         new InStream.StreamOptions()
             .withCodec(OrcCodecPool.getCodec(CompressionKind.ZLIB))
             .withBufferSize(1024);
-    final int SLOP = 2 * (OutStream.HEADER_SIZE + options.getBufferSize());
+    int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / options.getBufferSize();
+    final int SLOP = stretchFactor * (OutStream.HEADER_SIZE + options.getBufferSize());
     MockDataReader dataReader = new MockDataReader(schema, options)
       .addStream(1, OrcProto.Stream.Kind.ROW_INDEX,
           createRowIndex(options,
@@ -2516,4 +2520,55 @@ public class TestRecordReaderImpl {
 
     assertEquals(TruthValue.NULL, whenNoHasValuesAndHasNullStatistics);
   }
+
+  @Test
+  public void testRgEndOffset() throws IOException {
+    for (int compressionSize = 64; compressionSize < 4096; compressionSize *= 2) {
+      testSmallCompressionSizeOrc(compressionSize);
+    }
+  }
+
+  private void testSmallCompressionSizeOrc(int compressionSize) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path(workDir, "smallCompressionSize.orc");
+    FileSystem.get(conf).delete(path, true);
+
+    TypeDescription schema = TypeDescription.fromString("struct<x:int>");
+    conf.setLong(OrcConf.BUFFER_SIZE.getAttribute(), compressionSize);
+    OrcFile.WriterOptions options = OrcFile.writerOptions(conf).setSchema(schema);
+    Writer writer = OrcFile.createWriter(path, options);
+    VectorizedRowBatch writeBatch = schema.createRowBatch();
+    LongColumnVector writeX = (LongColumnVector) writeBatch.cols[0];
+    for (int row = 0; row < 30_000; ++row) {
+      int idx = writeBatch.size++;
+      writeX.vector[idx] = row >= 10_000 && row < 20_000 ? row + 100_000 : row;
+      if (writeBatch.size == writeBatch.getMaxSize()) {
+        writer.addRowBatch(writeBatch);
+        writeBatch.reset();
+      }
+    }
+    if (writeBatch.size != 0) {
+      writer.addRowBatch(writeBatch);
+    }
+    writer.close();
+
+    Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    // only the second row group will be selected
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+        .startNot()
+        .lessThan("x", PredicateLeaf.Type.LONG, 100000L)
+        .end().build();
+    VectorizedRowBatch batch = reader.getSchema().createRowBatch();
+    LongColumnVector readX = (LongColumnVector) batch.cols[0];
+    try (RecordReader rows = reader.rows(reader.options().searchArgument(sarg, null))) {
+      int row = 10_000;
+      while (rows.nextBatch(batch)) {
+        for (int i = 0; i < batch.size; i++) {
+          final int current_row = row++;
+          final int expectedVal = current_row >= 10_000 && current_row < 20_000 ? current_row + 100_000 : current_row;
+          assertEquals(expectedVal, readX.vector[i]);
+        }
+      }
+    }
+  }
 }