You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/10/10 21:13:34 UTC
[iceberg] branch master updated: Spark: Close final reader in
BaseDataIterator (#1563)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 25bc2b6 Spark: Close final reader in BaseDataIterator (#1563)
25bc2b6 is described below
commit 25bc2b6ab749fdd5c753ff9e794a5262c130f361
Author: mickjermsurawong-stripe <42...@users.noreply.github.com>
AuthorDate: Sat Oct 10 14:13:27 2020 -0700
Spark: Close final reader in BaseDataIterator (#1563)
---
.../iceberg/spark/source/BaseDataReader.java | 1 +
.../spark/source/TestSparkBaseDataReader.java | 283 +++++++++++++++++++++
.../spark/source/TestSparkBaseDataReader24.java | 23 ++
.../spark/source/TestSparkBaseDataReader3.java | 23 ++
4 files changed, 330 insertions(+)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index f2896db..60c22e8 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -84,6 +84,7 @@ abstract class BaseDataReader<T> implements Closeable {
this.currentIterator.close();
this.currentIterator = open(tasks.next());
} else {
+ this.currentIterator.close();
return false;
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
new file mode 100644
index 0000000..51b47cb
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.FileFormat.PARQUET;
+import static org.apache.iceberg.Files.localOutput;
+
+public abstract class TestSparkBaseDataReader {
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private static final Configuration CONFD = new Configuration();
+
+ // Simulates the closeable iterator of data to be read
+ private static class CloseableIntegerRange implements CloseableIterator<Integer> {
+ boolean closed;
+ Iterator<Integer> iter;
+
+ CloseableIntegerRange(long range) {
+ this.closed = false;
+ this.iter = IntStream.range(0, (int) range).iterator();
+ }
+
+ @Override
+ public void close() {
+ this.closed = true;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Integer next() {
+ return iter.next();
+ }
+ }
+
+ // Main reader class to test base class iteration logic.
+ // Keeps track of iterator closure.
+ private static class ClosureTrackingReader extends BaseDataReader<Integer> {
+ private Map<String, CloseableIntegerRange> tracker = new HashMap<>();
+
+ ClosureTrackingReader(List<FileScanTask> tasks) {
+ super(new BaseCombinedScanTask(tasks),
+ new HadoopFileIO(CONFD),
+ new PlaintextEncryptionManager());
+ }
+
+ @Override
+ CloseableIterator<Integer> open(FileScanTask task) {
+ CloseableIntegerRange intRange = new CloseableIntegerRange(task.file().recordCount());
+ tracker.put(getKey(task), intRange);
+ return intRange;
+ }
+
+ public Boolean isIteratorClosed(FileScanTask task) {
+ return tracker.get(getKey(task)).closed;
+ }
+
+ public Boolean hasIterator(FileScanTask task) {
+ return tracker.containsKey(getKey(task));
+ }
+
+ private String getKey(FileScanTask task) {
+ return task.file().path().toString();
+ }
+ }
+
+ @Test
+ public void testClosureOnDataExhaustion() throws IOException {
+ Integer totalTasks = 10;
+ Integer recordPerTask = 10;
+ List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+ ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+ int countRecords = 0;
+ while (reader.next()) {
+ countRecords += 1;
+ Assert.assertNotNull("Reader should return non-null value", reader.get());
+ }
+
+ Assert.assertEquals("Reader returned incorrect number of records",
+ totalTasks * recordPerTask,
+ countRecords
+ );
+ tasks.forEach(t ->
+ Assert.assertTrue("All iterators should be closed after read exhausion",
+ reader.isIteratorClosed(t))
+ );
+ }
+
+ @Test
+ public void testClosureDuringIteration() throws IOException {
+ Integer totalTasks = 2;
+ Integer recordPerTask = 1;
+ List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+ Assert.assertEquals(2, tasks.size());
+ FileScanTask firstTask = tasks.get(0);
+ FileScanTask secondTask = tasks.get(1);
+
+ ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+ // Total of 2 elements
+ Assert.assertTrue(reader.next());
+ Assert.assertFalse("First iter should not be closed on its last element",
+ reader.isIteratorClosed(firstTask));
+
+ Assert.assertTrue(reader.next());
+ Assert.assertTrue("First iter should be closed after moving to second iter",
+ reader.isIteratorClosed(firstTask));
+ Assert.assertFalse("Second iter should not be closed on its last element",
+ reader.isIteratorClosed(secondTask));
+
+ Assert.assertFalse(reader.next());
+ Assert.assertTrue(reader.isIteratorClosed(firstTask));
+ Assert.assertTrue(reader.isIteratorClosed(secondTask));
+ }
+
+ @Test
+ public void testClosureWithoutAnyRead() throws IOException {
+ Integer totalTasks = 10;
+ Integer recordPerTask = 10;
+ List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+ ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+ reader.close();
+
+ tasks.forEach(t ->
+ Assert.assertFalse("Iterator should not be created eagerly for tasks",
+ reader.hasIterator(t))
+ );
+ }
+
+ @Test
+ public void testExplicitClosure() throws IOException {
+ Integer totalTasks = 10;
+ Integer recordPerTask = 10;
+ List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+ ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+ Integer halfDataSize = (totalTasks * recordPerTask) / 2;
+ for (int i = 0; i < halfDataSize; i++) {
+ Assert.assertTrue("Reader should have some element", reader.next());
+ Assert.assertNotNull("Reader should return non-null value", reader.get());
+ }
+
+ reader.close();
+
+ // Some tasks might have not been opened yet, so we don't have corresponding tracker for it.
+ // But all that have been created must be closed.
+ tasks.forEach(t -> {
+ if (reader.hasIterator(t)) {
+ Assert.assertTrue("Iterator should be closed after read exhausion",
+ reader.isIteratorClosed(t));
+ }
+ });
+ }
+
+ @Test
+ public void testIdempotentExplicitClosure() throws IOException {
+ Integer totalTasks = 10;
+ Integer recordPerTask = 10;
+ List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+ ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+ // Total 100 elements, only 5 iterators have been created
+ for (int i = 0; i < 45; i++) {
+ Assert.assertTrue("eader should have some element", reader.next());
+ Assert.assertNotNull("Reader should return non-null value", reader.get());
+ }
+
+ for (int closeAttempt = 0; closeAttempt < 5; closeAttempt++) {
+ reader.close();
+ for (int i = 0; i < 5; i++) {
+ Assert.assertTrue("Iterator should be closed after read exhausion",
+ reader.isIteratorClosed(tasks.get(i)));
+ }
+ for (int i = 5; i < 10; i++) {
+ Assert.assertFalse("Iterator should not be created eagerly for tasks",
+ reader.hasIterator(tasks.get(i)));
+ }
+ }
+ }
+
+ private List<FileScanTask> createFileScanTasks(Integer totalTasks, Integer recordPerTask) throws IOException {
+ String desc = "make_scan_tasks";
+ File parent = temp.newFolder(desc);
+ File location = new File(parent, "test");
+ File dataFolder = new File(location, "data");
+ Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
+
+ Schema schema = new Schema(
+ Types.NestedField.required(0, "id", Types.LongType.get())
+ );
+
+ try {
+ Table table = TestTables.create(location, desc, schema, PartitionSpec.unpartitioned());
+ // Important: use the table's schema for the rest of the test
+ // When tables are created, the column ids are reassigned.
+ Schema tableSchema = table.schema();
+ List<GenericData.Record> expected = RandomData.generateList(tableSchema, recordPerTask, 1L);
+
+ AppendFiles appendFiles = table.newAppend();
+ for (int i = 0; i < totalTasks; i++) {
+ File parquetFile = new File(dataFolder, PARQUET.addExtension(UUID.randomUUID().toString()));
+ try (FileAppender<GenericData.Record> writer = Parquet.write(localOutput(parquetFile))
+ .schema(tableSchema)
+ .build()) {
+ writer.addAll(expected);
+ }
+ DataFile file = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withFileSizeInBytes(parquetFile.length())
+ .withPath(parquetFile.toString())
+ .withRecordCount(recordPerTask)
+ .build();
+ appendFiles.appendFile(file);
+ }
+ appendFiles.commit();
+
+ return StreamSupport
+ .stream(table.newScan().planFiles().spliterator(), false)
+ .collect(Collectors.toList());
+ } finally {
+ TestTables.clearTables();
+ }
+ }
+}
diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader24.java
new file mode 100644
index 0000000..100c951
--- /dev/null
+++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader24.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+public class TestSparkBaseDataReader24 extends TestSparkBaseDataReader {
+}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader3.java
new file mode 100644
index 0000000..4322e34
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader3.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+public class TestSparkBaseDataReader3 extends TestSparkBaseDataReader {
+}