You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/03 17:27:10 UTC
[iceberg] branch master updated: Arrow: Make ArrowReader iterator
idempotent (#3148)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9798c4d Arrow: Make ArrowReader iterator idempotent (#3148)
9798c4d is described below
commit 9798c4d2dffc960b9657a2b0ba7074781b1d87b8
Author: mayursrivastava <ma...@twosigma.com>
AuthorDate: Sun Oct 3 13:26:59 2021 -0400
Arrow: Make ArrowReader iterator idempotent (#3148)
Follow-up on https://github.com/apache/iceberg/pull/2933.
---
.../iceberg/arrow/vectorized/ArrowReader.java | 9 ++--
.../iceberg/arrow/vectorized/ArrowReaderTest.java | 49 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 3 deletions(-)
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
index f09774e..8869da4 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -204,7 +205,6 @@ public class ArrowReader extends CloseableGroup {
private final int batchSize;
private final boolean reuseContainers;
private CloseableIterator<ColumnarBatch> currentIterator;
- private ColumnarBatch current;
private FileScanTask currentTask;
/**
@@ -286,7 +286,6 @@ public class ArrowReader extends CloseableGroup {
try {
while (true) {
if (currentIterator.hasNext()) {
- this.current = currentIterator.next();
return true;
} else if (fileItr.hasNext()) {
this.currentIterator.close();
@@ -308,7 +307,11 @@ public class ArrowReader extends CloseableGroup {
@Override
public ColumnarBatch next() {
- return current;
+ if (hasNext()) {
+ return currentIterator.next();
+ } else {
+ throw new NoSuchElementException();
+ }
}
CloseableIterator<ColumnarBatch> open(FileScanTask task) {
diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
index 43f9df5..d729c98 100644
--- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
+++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
@@ -73,6 +73,7 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -300,6 +301,23 @@ public class ArrowReaderTest {
}
/**
+ * The test asserts that {@link CloseableIterator#hasNext()} returned
+ * by the {@link ArrowReader} is idempotent.
+ */
+ @Test
+ public void testHasNextIsIdempotent() throws Exception {
+ writeTableWithIncrementalRecords();
+ Table table = tables.load(tableLocation);
+ TableScan scan = table.newScan();
+ // Call hasNext() 0 extra times.
+ readAndCheckHasNextIsIdempotent(scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, 0, ALL_COLUMNS);
+ // Call hasNext() 1 extra time.
+ readAndCheckHasNextIsIdempotent(scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, 1, ALL_COLUMNS);
+ // Call hasNext() 2 extra times.
+ readAndCheckHasNextIsIdempotent(scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, 2, ALL_COLUMNS);
+ }
+
+ /**
* Run the following verifications:
* <ol>
* <li>Read the data and verify that the returned ColumnarBatches match expected rows.</li>
@@ -353,6 +371,37 @@ public class ArrowReaderTest {
assertEquals(expectedTotalRows, totalRows);
}
+ private void readAndCheckHasNextIsIdempotent(
+ TableScan scan,
+ int numRowsPerRoot,
+ int expectedTotalRows,
+ int numExtraCallsToHasNext,
+ List<String> columns) throws IOException {
+ Set<String> columnSet = ImmutableSet.copyOf(columns);
+ int rowIndex = 0;
+ int totalRows = 0;
+ try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, numRowsPerRoot, false)) {
+ CloseableIterator<ColumnarBatch> iterator = itr.iterator();
+ while (iterator.hasNext()) {
+ // Call hasNext() a few extra times.
+ // This should not affect the total number of rows read.
+ for (int i = 0; i < numExtraCallsToHasNext; i++) {
+ assertTrue(iterator.hasNext());
+ }
+
+ ColumnarBatch batch = iterator.next();
+ VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors();
+ assertEquals(createExpectedArrowSchema(columnSet), root.getSchema());
+ checkAllVectorTypes(root, columnSet);
+ List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + numRowsPerRoot);
+ checkAllVectorValues(numRowsPerRoot, expectedRows, root, columnSet);
+ rowIndex += numRowsPerRoot;
+ totalRows += root.getRowCount();
+ }
+ }
+ assertEquals(expectedTotalRows, totalRows);
+ }
+
@SuppressWarnings("MethodLength")
private void checkColumnarBatch(
int expectedNumRows,