You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/11/02 22:00:26 UTC
[iceberg] branch master updated: ORC: Fix non-vectorized reader
incorrectly skipping rows (#1706)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6c22c24 ORC: Fix non-vectorized reader incorrectly skipping rows (#1706)
6c22c24 is described below
commit 6c22c243153e08d8212e2329475f1099b3fe9e32
Author: Shardul Mahadik <sm...@linkedin.com>
AuthorDate: Mon Nov 2 14:00:16 2020 -0800
ORC: Fix non-vectorized reader incorrectly skipping rows (#1706)
---
.../iceberg/data/orc/TestOrcRowIterator.java | 119 +++++++++++++++++++++
.../java/org/apache/iceberg/orc/OrcIterable.java | 7 +-
2 files changed, 124 insertions(+), 2 deletions(-)
diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java b/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java
new file mode 100644
index 0000000..cde3f1e
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DataTestHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.OrcConf;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestOrcRowIterator {
+
+ private static final Schema DATA_SCHEMA = new Schema(
+ required(100, "id", Types.LongType.get())
+ );
+
+ private static final int NUM_ROWS = 8000;
+ private static final List<Record> DATA_ROWS;
+
+ static {
+ DATA_ROWS = Lists.newArrayListWithCapacity(NUM_ROWS);
+ for (long i = 0; i < NUM_ROWS; i++) {
+ Record row = GenericRecord.create(DATA_SCHEMA);
+ row.set(0, i);
+ DATA_ROWS.add(row);
+ }
+ }
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private File testFile;
+
+ @Before
+ public void writeFile() throws IOException {
+ testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ try (FileAppender<Record> writer = ORC.write(Files.localOutput(testFile))
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .schema(DATA_SCHEMA)
+ // write in such a way that the file contains 2 stripes each with 4 row groups of 1000 rows
+ .config("iceberg.orc.vectorbatch.size", "1000")
+ .config(OrcConf.ROW_INDEX_STRIDE.getAttribute(), "1000")
+ .config(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "4000")
+ .config(OrcConf.STRIPE_SIZE.getAttribute(), "1")
+ .build()) {
+ writer.addAll(DATA_ROWS);
+ }
+ }
+
+ @Test
+ public void testReadAllStripes() throws IOException {
+ // With default batch size of 1024, will read the following batches
+ // Stripe 1: 1024, 1024, 1024, 928
+ // Stripe 2: 1024, 1024, 1024, 928
+ readAndValidate(Expressions.alwaysTrue(), DATA_ROWS);
+ }
+
+ @Test
+ public void testReadFilteredRowGroupInMiddle() throws IOException {
+ // We skip the 2nd row group [1000, 2000] in Stripe 1
+ // With default batch size of 1024, will read the following batches
+ // Stripe 1: 1000, 1024, 976
+ readAndValidate(Expressions.in("id", 500, 2500, 3500),
+ Lists.newArrayList(Iterables.concat(DATA_ROWS.subList(0, 1000), DATA_ROWS.subList(2000, 4000))));
+ }
+
+ private void readAndValidate(Expression filter, List<Record> expected) throws IOException {
+ List<Record> rows;
+ try (CloseableIterable<Record> reader = ORC.read(Files.localInput(testFile))
+ .project(DATA_SCHEMA)
+ .filter(filter)
+ .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(DATA_SCHEMA, fileSchema))
+ .build()) {
+ rows = Lists.newArrayList(reader);
+ }
+
+ for (int i = 0; i < expected.size(); i += 1) {
+ DataTestHelpers.assertEquals(DATA_SCHEMA.asStruct(), expected.get(i), rows.get(i));
+ }
+ }
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
index 2560f05..d3eaa41 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
@@ -133,6 +133,7 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
private int nextRow;
private VectorizedRowBatch current;
+ private int currentBatchSize;
private final VectorizedRowBatchIterator batchIter;
private final OrcRowReader<T> reader;
@@ -142,18 +143,20 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
this.reader = reader;
current = null;
nextRow = 0;
+ currentBatchSize = 0;
}
@Override
public boolean hasNext() {
- return (current != null && nextRow < current.size) || batchIter.hasNext();
+ return (current != null && nextRow < currentBatchSize) || batchIter.hasNext();
}
@Override
public T next() {
- if (current == null || nextRow >= current.size) {
+ if (current == null || nextRow >= currentBatchSize) {
Pair<VectorizedRowBatch, Long> nextBatch = batchIter.next();
current = nextBatch.first();
+ currentBatchSize = current.size;
nextRow = 0;
this.reader.setBatchContext(nextBatch.second());
}