You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/10/10 21:13:34 UTC

[iceberg] branch master updated: Spark: Close final reader in BaseDataIterator (#1563)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 25bc2b6  Spark: Close final reader in BaseDataIterator (#1563)
25bc2b6 is described below

commit 25bc2b6ab749fdd5c753ff9e794a5262c130f361
Author: mickjermsurawong-stripe <42...@users.noreply.github.com>
AuthorDate: Sat Oct 10 14:13:27 2020 -0700

    Spark: Close final reader in BaseDataIterator (#1563)
---
 .../iceberg/spark/source/BaseDataReader.java       |   1 +
 .../spark/source/TestSparkBaseDataReader.java      | 283 +++++++++++++++++++++
 .../spark/source/TestSparkBaseDataReader24.java    |  23 ++
 .../spark/source/TestSparkBaseDataReader3.java     |  23 ++
 4 files changed, 330 insertions(+)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index f2896db..60c22e8 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -84,6 +84,7 @@ abstract class BaseDataReader<T> implements Closeable {
         this.currentIterator.close();
         this.currentIterator = open(tasks.next());
       } else {
+        this.currentIterator.close();
         return false;
       }
     }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
new file mode 100644
index 0000000..51b47cb
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.FileFormat.PARQUET;
+import static org.apache.iceberg.Files.localOutput;
+
+public abstract class TestSparkBaseDataReader {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private static final Configuration CONFD = new Configuration();
+
+  // Simulates the closeable iterator of data to be read
+  private static class CloseableIntegerRange implements CloseableIterator<Integer> {
+    boolean closed;
+    Iterator<Integer> iter;
+
+    CloseableIntegerRange(long range) {
+      this.closed = false;
+      this.iter = IntStream.range(0, (int) range).iterator();
+    }
+
+    @Override
+    public void close() {
+      this.closed = true;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext();
+    }
+
+    @Override
+    public Integer next() {
+      return iter.next();
+    }
+  }
+
+  // Main reader class to test base class iteration logic.
+  // Keeps track of iterator closure.
+  private static class ClosureTrackingReader extends BaseDataReader<Integer> {
+    private Map<String, CloseableIntegerRange> tracker = new HashMap<>();
+
+    ClosureTrackingReader(List<FileScanTask> tasks) {
+      super(new BaseCombinedScanTask(tasks),
+          new HadoopFileIO(CONFD),
+          new PlaintextEncryptionManager());
+    }
+
+    @Override
+    CloseableIterator<Integer> open(FileScanTask task) {
+      CloseableIntegerRange intRange = new CloseableIntegerRange(task.file().recordCount());
+      tracker.put(getKey(task), intRange);
+      return intRange;
+    }
+
+    public Boolean isIteratorClosed(FileScanTask task) {
+      return tracker.get(getKey(task)).closed;
+    }
+
+    public Boolean hasIterator(FileScanTask task) {
+      return tracker.containsKey(getKey(task));
+    }
+
+    private String getKey(FileScanTask task) {
+      return task.file().path().toString();
+    }
+  }
+
+  @Test
+  public void testClosureOnDataExhaustion() throws IOException {
+    Integer totalTasks = 10;
+    Integer recordPerTask = 10;
+    List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+    int countRecords = 0;
+    while (reader.next()) {
+      countRecords += 1;
+      Assert.assertNotNull("Reader should return non-null value", reader.get());
+    }
+
+    Assert.assertEquals("Reader returned incorrect number of records",
+        totalTasks * recordPerTask,
+        countRecords
+    );
+    tasks.forEach(t ->
+        Assert.assertTrue("All iterators should be closed after read exhausion",
+            reader.isIteratorClosed(t))
+    );
+  }
+
+  @Test
+  public void testClosureDuringIteration() throws IOException {
+    Integer totalTasks = 2;
+    Integer recordPerTask = 1;
+    List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+    Assert.assertEquals(2, tasks.size());
+    FileScanTask firstTask = tasks.get(0);
+    FileScanTask secondTask = tasks.get(1);
+
+    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+    // Total of 2 elements
+    Assert.assertTrue(reader.next());
+    Assert.assertFalse("First iter should not be closed on its last element",
+        reader.isIteratorClosed(firstTask));
+
+    Assert.assertTrue(reader.next());
+    Assert.assertTrue("First iter should be closed after moving to second iter",
+        reader.isIteratorClosed(firstTask));
+    Assert.assertFalse("Second iter should not be closed on its last element",
+        reader.isIteratorClosed(secondTask));
+
+    Assert.assertFalse(reader.next());
+    Assert.assertTrue(reader.isIteratorClosed(firstTask));
+    Assert.assertTrue(reader.isIteratorClosed(secondTask));
+  }
+
+  @Test
+  public void testClosureWithoutAnyRead() throws IOException {
+    Integer totalTasks = 10;
+    Integer recordPerTask = 10;
+    List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+    reader.close();
+
+    tasks.forEach(t ->
+        Assert.assertFalse("Iterator should not be created eagerly for tasks",
+            reader.hasIterator(t))
+    );
+  }
+
+  @Test
+  public void testExplicitClosure() throws IOException {
+    Integer totalTasks = 10;
+    Integer recordPerTask = 10;
+    List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+    Integer halfDataSize = (totalTasks * recordPerTask) / 2;
+    for (int i = 0; i < halfDataSize; i++) {
+      Assert.assertTrue("Reader should have some element", reader.next());
+      Assert.assertNotNull("Reader should return non-null value", reader.get());
+    }
+
+    reader.close();
+
+    // Some tasks might have not been opened yet, so we don't have corresponding tracker for it.
+    // But all that have been created must be closed.
+    tasks.forEach(t -> {
+      if (reader.hasIterator(t)) {
+        Assert.assertTrue("Iterator should be closed after read exhausion",
+            reader.isIteratorClosed(t));
+      }
+    });
+  }
+
+  @Test
+  public void testIdempotentExplicitClosure() throws IOException {
+    Integer totalTasks = 10;
+    Integer recordPerTask = 10;
+    List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+    // Total 100 elements, only 5 iterators have been created
+    for (int i = 0; i < 45; i++) {
+      Assert.assertTrue("eader should have some element", reader.next());
+      Assert.assertNotNull("Reader should return non-null value", reader.get());
+    }
+
+    for (int closeAttempt = 0; closeAttempt < 5; closeAttempt++) {
+      reader.close();
+      for (int i = 0; i < 5; i++) {
+        Assert.assertTrue("Iterator should be closed after read exhausion",
+            reader.isIteratorClosed(tasks.get(i)));
+      }
+      for (int i = 5; i < 10; i++) {
+        Assert.assertFalse("Iterator should not be created eagerly for tasks",
+            reader.hasIterator(tasks.get(i)));
+      }
+    }
+  }
+
+  private List<FileScanTask> createFileScanTasks(Integer totalTasks, Integer recordPerTask) throws IOException {
+    String desc = "make_scan_tasks";
+    File parent = temp.newFolder(desc);
+    File location = new File(parent, "test");
+    File dataFolder = new File(location, "data");
+    Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
+
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get())
+    );
+
+    try {
+      Table table = TestTables.create(location, desc, schema, PartitionSpec.unpartitioned());
+      // Important: use the table's schema for the rest of the test
+      // When tables are created, the column ids are reassigned.
+      Schema tableSchema = table.schema();
+      List<GenericData.Record> expected = RandomData.generateList(tableSchema, recordPerTask, 1L);
+
+      AppendFiles appendFiles = table.newAppend();
+      for (int i = 0; i < totalTasks; i++) {
+        File parquetFile = new File(dataFolder, PARQUET.addExtension(UUID.randomUUID().toString()));
+        try (FileAppender<GenericData.Record> writer = Parquet.write(localOutput(parquetFile))
+            .schema(tableSchema)
+            .build()) {
+          writer.addAll(expected);
+        }
+        DataFile file = DataFiles.builder(PartitionSpec.unpartitioned())
+            .withFileSizeInBytes(parquetFile.length())
+            .withPath(parquetFile.toString())
+            .withRecordCount(recordPerTask)
+            .build();
+        appendFiles.appendFile(file);
+      }
+      appendFiles.commit();
+
+      return StreamSupport
+          .stream(table.newScan().planFiles().spliterator(), false)
+          .collect(Collectors.toList());
+    } finally {
+      TestTables.clearTables();
+    }
+  }
+}
diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader24.java
new file mode 100644
index 0000000..100c951
--- /dev/null
+++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader24.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+public class TestSparkBaseDataReader24 extends TestSparkBaseDataReader {
+}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader3.java
new file mode 100644
index 0000000..4322e34
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader3.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+public class TestSparkBaseDataReader3 extends TestSparkBaseDataReader {
+}