You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/11/02 22:00:26 UTC

[iceberg] branch master updated: ORC: Fix non-vectorized reader incorrectly skipping rows (#1706)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c22c24  ORC: Fix non-vectorized reader incorrectly skipping rows (#1706)
6c22c24 is described below

commit 6c22c243153e08d8212e2329475f1099b3fe9e32
Author: Shardul Mahadik <sm...@linkedin.com>
AuthorDate: Mon Nov 2 14:00:16 2020 -0800

    ORC: Fix non-vectorized reader incorrectly skipping rows (#1706)
---
 .../iceberg/data/orc/TestOrcRowIterator.java       | 119 +++++++++++++++++++++
 .../java/org/apache/iceberg/orc/OrcIterable.java   |   7 +-
 2 files changed, 124 insertions(+), 2 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java b/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java
new file mode 100644
index 0000000..cde3f1e
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DataTestHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.OrcConf;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestOrcRowIterator {
+
+  private static final Schema DATA_SCHEMA = new Schema(
+      required(100, "id", Types.LongType.get())
+  );
+
+  private static final int NUM_ROWS = 8000;
+  private static final List<Record> DATA_ROWS;
+
+  static {
+    DATA_ROWS = Lists.newArrayListWithCapacity(NUM_ROWS);
+    for (long i = 0; i < NUM_ROWS; i++) {
+      Record row = GenericRecord.create(DATA_SCHEMA);
+      row.set(0, i);
+      DATA_ROWS.add(row);
+    }
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private File testFile;
+
+  @Before
+  public void writeFile() throws IOException {
+    testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<Record> writer = ORC.write(Files.localOutput(testFile))
+        .createWriterFunc(GenericOrcWriter::buildWriter)
+        .schema(DATA_SCHEMA)
+        // write in such a way that the file contains 2 stripes each with 4 row groups of 1000 rows
+        .config("iceberg.orc.vectorbatch.size", "1000")
+        .config(OrcConf.ROW_INDEX_STRIDE.getAttribute(), "1000")
+        .config(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "4000")
+        .config(OrcConf.STRIPE_SIZE.getAttribute(), "1")
+        .build()) {
+      writer.addAll(DATA_ROWS);
+    }
+  }
+
+  @Test
+  public void testReadAllStripes() throws IOException {
+    // With default batch size of 1024, will read the following batches
+    // Stripe 1: 1024, 1024, 1024, 928
+    // Stripe 2: 1024, 1024, 1024, 928
+    readAndValidate(Expressions.alwaysTrue(), DATA_ROWS);
+  }
+
+  @Test
+  public void testReadFilteredRowGroupInMiddle() throws IOException {
+    // We skip the 2nd row group [1000, 2000] in Stripe 1
+    // With default batch size of 1024, will read the following batches
+    // Stripe 1: 1000, 1024, 976
+    readAndValidate(Expressions.in("id", 500, 2500, 3500),
+        Lists.newArrayList(Iterables.concat(DATA_ROWS.subList(0, 1000), DATA_ROWS.subList(2000, 4000))));
+  }
+
+  private void readAndValidate(Expression filter, List<Record> expected) throws IOException {
+    List<Record> rows;
+    try (CloseableIterable<Record> reader = ORC.read(Files.localInput(testFile))
+        .project(DATA_SCHEMA)
+        .filter(filter)
+        .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(DATA_SCHEMA, fileSchema))
+        .build()) {
+      rows = Lists.newArrayList(reader);
+    }
+
+    for (int i = 0; i < expected.size(); i += 1) {
+      DataTestHelpers.assertEquals(DATA_SCHEMA.asStruct(), expected.get(i), rows.get(i));
+    }
+  }
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
index 2560f05..d3eaa41 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
@@ -133,6 +133,7 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
 
     private int nextRow;
     private VectorizedRowBatch current;
+    private int currentBatchSize;
 
     private final VectorizedRowBatchIterator batchIter;
     private final OrcRowReader<T> reader;
@@ -142,18 +143,20 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
       this.reader = reader;
       current = null;
       nextRow = 0;
+      currentBatchSize = 0;
     }
 
     @Override
     public boolean hasNext() {
-      return (current != null && nextRow < current.size) || batchIter.hasNext();
+      return (current != null && nextRow < currentBatchSize) || batchIter.hasNext();
     }
 
     @Override
     public T next() {
-      if (current == null || nextRow >= current.size) {
+      if (current == null || nextRow >= currentBatchSize) {
         Pair<VectorizedRowBatch, Long> nextBatch = batchIter.next();
         current = nextBatch.first();
+        currentBatchSize = current.size;
         nextRow = 0;
         this.reader.setBatchContext(nextBatch.second());
       }