You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/03 17:27:10 UTC

[iceberg] branch master updated: Arrow: Make ArrowReader iterator idempotent (#3148)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9798c4d  Arrow: Make ArrowReader iterator idempotent (#3148)
9798c4d is described below

commit 9798c4d2dffc960b9657a2b0ba7074781b1d87b8
Author: mayursrivastava <ma...@twosigma.com>
AuthorDate: Sun Oct 3 13:26:59 2021 -0400

    Arrow: Make ArrowReader iterator idempotent (#3148)
    
    Follow-up on https://github.com/apache/iceberg/pull/2933.
---
 .../iceberg/arrow/vectorized/ArrowReader.java      |  9 ++--
 .../iceberg/arrow/vectorized/ArrowReaderTest.java  | 49 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
index f09774e..8869da4 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -204,7 +205,6 @@ public class ArrowReader extends CloseableGroup {
     private final int batchSize;
     private final boolean reuseContainers;
     private CloseableIterator<ColumnarBatch> currentIterator;
-    private ColumnarBatch current;
     private FileScanTask currentTask;
 
     /**
@@ -286,7 +286,6 @@ public class ArrowReader extends CloseableGroup {
       try {
         while (true) {
           if (currentIterator.hasNext()) {
-            this.current = currentIterator.next();
             return true;
           } else if (fileItr.hasNext()) {
             this.currentIterator.close();
@@ -308,7 +307,11 @@ public class ArrowReader extends CloseableGroup {
 
     @Override
     public ColumnarBatch next() {
-      return current;
+      if (hasNext()) {
+        return currentIterator.next();
+      } else {
+        throw new NoSuchElementException();
+      }
     }
 
     CloseableIterator<ColumnarBatch> open(FileScanTask task) {
diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
index 43f9df5..d729c98 100644
--- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
+++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
@@ -73,6 +73,7 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -300,6 +301,23 @@ public class ArrowReaderTest {
   }
 
   /**
+   * The test asserts that {@link CloseableIterator#hasNext()} returned
+   * by the {@link ArrowReader} is idempotent.
+   */
+  @Test
+  public void testHasNextIsIdempotent() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    TableScan scan = table.newScan();
+    // Call hasNext() 0 extra times.
+    readAndCheckHasNextIsIdempotent(scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, 0, ALL_COLUMNS);
+    // Call hasNext() 1 extra time.
+    readAndCheckHasNextIsIdempotent(scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, 1, ALL_COLUMNS);
+    // Call hasNext() 2 extra times.
+    readAndCheckHasNextIsIdempotent(scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, 2, ALL_COLUMNS);
+  }
+
+  /**
    * Run the following verifications:
    * <ol>
    *   <li>Read the data and verify that the returned ColumnarBatches match expected rows.</li>
@@ -353,6 +371,37 @@ public class ArrowReaderTest {
     assertEquals(expectedTotalRows, totalRows);
   }
 
+  private void readAndCheckHasNextIsIdempotent(
+      TableScan scan,
+      int numRowsPerRoot,
+      int expectedTotalRows,
+      int numExtraCallsToHasNext,
+      List<String> columns) throws IOException {
+    Set<String> columnSet = ImmutableSet.copyOf(columns);
+    int rowIndex = 0;
+    int totalRows = 0;
+    try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, numRowsPerRoot, false)) {
+      CloseableIterator<ColumnarBatch> iterator = itr.iterator();
+      while (iterator.hasNext()) {
+        // Call hasNext() a few extra times.
+        // This should not affect the total number of rows read.
+        for (int i = 0; i < numExtraCallsToHasNext; i++) {
+          assertTrue(iterator.hasNext());
+        }
+
+        ColumnarBatch batch = iterator.next();
+        VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors();
+        assertEquals(createExpectedArrowSchema(columnSet), root.getSchema());
+        checkAllVectorTypes(root, columnSet);
+        List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + numRowsPerRoot);
+        checkAllVectorValues(numRowsPerRoot, expectedRows, root, columnSet);
+        rowIndex += numRowsPerRoot;
+        totalRows += root.getRowCount();
+      }
+    }
+    assertEquals(expectedTotalRows, totalRows);
+  }
+
   @SuppressWarnings("MethodLength")
   private void checkColumnarBatch(
       int expectedNumRows,