You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by pg...@apache.org on 2022/01/17 14:39:50 UTC

[orc] branch branch-1.6 updated: ORC-1078: Row group end offset doesn't accommodate all the blocks (#996)

This is an automated email from the ASF dual-hosted git repository.

pgaref pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new ec4152f  ORC-1078: Row group end offset doesn't accommodate all the blocks (#996)
ec4152f is described below

commit ec4152f8ec9751c3070a282dc3e54c6b58a236cd
Author: Yu-Wen <hs...@users.noreply.github.com>
AuthorDate: Mon Jan 17 16:25:16 2022 +0200

    ORC-1078: Row group end offset doesn't accommodate all the blocks (#996)
    
    ### What changes were proposed in this pull request?
    To calculate the row group end offset based on the buffer size. Current implementation assumes stretching the slop by a constant factor of 2 to accommodate the compression block but thats not always the case when smaller buffer sizes are used.
    
    ### Why are the changes needed?
    When compression size is smaller than 2048 and two blocks are not enough for all the values left in last row group, we might end up seeking outside of range.
    
    ### How was this patch tested?
    Unit test added TestRecordReaderImpl#testSmallCompressionSizeOrc
    
    Change-Id: Ie78e321b891ebc88e9ffa91b7899596873ec8d1d
---
 .../org/apache/orc/impl/RecordReaderUtils.java     | 20 +++++---
 .../org/apache/orc/impl/SerializationUtils.java    |  2 +-
 .../org/apache/orc/impl/TestRecordReaderImpl.java  | 57 +++++++++++++++++++++-
 3 files changed, 70 insertions(+), 9 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 2d67875..da89fd3 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -181,13 +181,14 @@ public class RecordReaderUtils {
                                          boolean isLast,
                                          long nextGroupOffset,
                                          long streamLength) {
-    // figure out the worst case last location
-    // if adjacent groups have the same compressed block offset then stretch the slop
-    // by factor of 2 to safely accommodate the next compression block.
-    // One for the current compression block and another for the next compression block.
-    long slop = isCompressed
-                    ? 2 * (OutStream.HEADER_SIZE + bufferSize)
-                    : WORST_UNCOMPRESSED_SLOP;
+    // Figure out the worst case last location
+    long slop = WORST_UNCOMPRESSED_SLOP;
+    // Stretch the slop by a factor to safely accommodate following compression blocks.
+    // We need to calculate the maximum number of blocks(stretchFactor) by bufferSize accordingly.
+    if (isCompressed) {
+      int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / bufferSize;
+      slop = stretchFactor * (OutStream.HEADER_SIZE + bufferSize);
+    }
     return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
   }
 
@@ -263,6 +264,11 @@ public class RecordReaderUtils {
   // for uncompressed streams, what is the most overlap with the following set
   // of rows (long vint literal group).
   static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+  // the maximum number of values that need to be consumed from the run
+  static final int MAX_VALUES_LENGTH = RunLengthIntegerWriterV2.MAX_SCOPE;
+  // the maximum byte width for each value
+  static final int MAX_BYTE_WIDTH =
+      SerializationUtils.decodeBitWidth(SerializationUtils.FixedBitSizes.SIXTYFOUR.ordinal()) / 8;
 
   /**
    * Is this stream part of a dictionary?
diff --git a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
index c916feb..9bf5f3f 100644
--- a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
@@ -426,7 +426,7 @@ public final class SerializationUtils {
    * @param n - encoded fixed bit width
    * @return decoded fixed bit width
    */
-  public int decodeBitWidth(int n) {
+  public static int decodeBitWidth(int n) {
     if (n >= FixedBitSizes.ONE.ordinal()
         && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
       return n + 1;
diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
index 4247733..57db2c7 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.orc.impl;
 
+import static org.apache.orc.impl.RecordReaderUtils.MAX_BYTE_WIDTH;
+import static org.apache.orc.impl.RecordReaderUtils.MAX_VALUES_LENGTH;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -68,6 +70,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
 import org.apache.orc.impl.reader.ReaderEncryption;
 import org.apache.orc.impl.reader.StripePlanner;
 import org.apache.orc.impl.writer.StreamOptions;
@@ -1534,7 +1537,8 @@ public class TestRecordReaderImpl {
         new InStream.StreamOptions()
             .withCodec(OrcCodecPool.getCodec(CompressionKind.ZLIB))
             .withBufferSize(1024);
-    final int SLOP = 2 * (OutStream.HEADER_SIZE + options.getBufferSize());
+    int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / options.getBufferSize();
+    final int SLOP = stretchFactor * (OutStream.HEADER_SIZE + options.getBufferSize());
     MockDataReader dataReader = new MockDataReader(schema, options)
       .addStream(1, OrcProto.Stream.Kind.ROW_INDEX,
           createRowIndex(options,
@@ -2445,4 +2449,55 @@ public class TestRecordReaderImpl {
     f2.setAccessible(true);
     assertFalse((boolean)f2.get(applier1));
   }
+
+  @Test
+  public void testRgEndOffset() throws IOException {
+    for (int compressionSize = 64; compressionSize < 4096; compressionSize *= 2) {
+      testSmallCompressionSizeOrc(compressionSize);
+    }
+  }
+
+  private void testSmallCompressionSizeOrc(int compressionSize) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path(workDir, "smallCompressionSize.orc");
+    FileSystem.get(conf).delete(path, true);
+
+    TypeDescription schema = TypeDescription.fromString("struct<x:int>");
+    conf.setLong(OrcConf.BUFFER_SIZE.getAttribute(), compressionSize);
+    OrcFile.WriterOptions options = OrcFile.writerOptions(conf).setSchema(schema);
+    Writer writer = OrcFile.createWriter(path, options);
+    VectorizedRowBatch writeBatch = schema.createRowBatch();
+    LongColumnVector writeX = (LongColumnVector) writeBatch.cols[0];
+    for (int row = 0; row < 30_000; ++row) {
+      int idx = writeBatch.size++;
+      writeX.vector[idx] = row >= 10_000 && row < 20_000 ? row + 100_000 : row;
+      if (writeBatch.size == writeBatch.getMaxSize()) {
+        writer.addRowBatch(writeBatch);
+        writeBatch.reset();
+      }
+    }
+    if (writeBatch.size != 0) {
+      writer.addRowBatch(writeBatch);
+    }
+    writer.close();
+
+    Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    // only the second row group will be selected
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+        .startNot()
+        .lessThan("x", PredicateLeaf.Type.LONG, 100000L)
+        .end().build();
+    VectorizedRowBatch batch = reader.getSchema().createRowBatch();
+    LongColumnVector readX = (LongColumnVector) batch.cols[0];
+    try (RecordReader rows = reader.rows(reader.options().searchArgument(sarg, null))) {
+      int row = 10_000;
+      while (rows.nextBatch(batch)) {
+        for (int i = 0; i < batch.size; i++) {
+          final int current_row = row++;
+          final int expectedVal = current_row >= 10_000 && current_row < 20_000 ? current_row + 100_000 : current_row;
+          assertEquals(expectedVal, readX.vector[i]);
+        }
+      }
+    }
+  }
 }