You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/10/07 23:08:32 UTC

[GitHub] [iceberg] mickjermsurawong-stripe opened a new pull request #1563: Close source data iterator for Spark data reader: fixing timeout due to S3A connection pool

mickjermsurawong-stripe opened a new pull request #1563:
URL: https://github.com/apache/iceberg/pull/1563


   - This PR closes data iterator when it is exhausted in Spark base data reader.
   - **Observation**: Using S3 store, _coalesced_ read sees timeout on http pool connections from S3A. Using our internal metrics, we do see constant number of pending http pool connections. By contrast, reading directly to the underlying S3 data path with parquet read (not iceberg) has zero pending connections.
   - **Cause**: Base reader for Spark does not close data iterator when it is exhausted
   - **Question**: Why does this not happen more commonly iceberg read (non-coaleseced)?
     - In Spark, the data reader will be closed after Spark _task_ is finished as it is registered in task completion callback: https://git.corp.stripe.com/stripe-private-oss-forks/spark/blob/stripe-2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala#L43
     - However, if the Spark read task has multiple partitions, we will not close the last data iterator for each partition. Only when the whole task is complete, the iterators will be released. We have leakage during lifetime of a task.
     - In a normal read without coalescing, we have each read task corresponding each parquet file part. So when we finish reading a parquet file, we finish the task and close connection.
     - Coalesced read reveals this edge case as each task is given more partitions to process.
   - **Testing**: I only managed to add unit tests for iteration logic here, but internally we have s3 mock which verified that this fix resolves the timeout issue.
   - I have checked the other two S3A timeout issues https://github.com/apache/iceberg/pull/150 and https://github.com/apache/iceberg/pull/1474 and on mailing list, and believe that this is a distinct issue. 
   
   cc @rdblue 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mickjermsurawong-stripe commented on a change in pull request #1563: Close source data iterator for Spark data reader: fixing timeout due to S3A connection pool

Posted by GitBox <gi...@apache.org>.
mickjermsurawong-stripe commented on a change in pull request #1563:
URL: https://github.com/apache/iceberg/pull/1563#discussion_r501388147



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.CloseableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public abstract class TestSparkBaseDataReader {
+
+  private static final Configuration CONF = new Configuration();
+
+  // Simulates the closeable iterator of data to be read
+  private static class CloseableIntegerRange implements CloseableIterator<Integer> {

Review comment:
       - I retain 2 helper classes here. One is the reader to test, and the other is this `CloseableIterator`.
   I was hoping i could somehow open the `FileScanTask` to a spy of `CloseableIterator`, and I could check invocation on `close`. However, I don't seem to find any impl of `CloseableIterator` readily available here. Any existing opening of task to iterator would also invoke other unneeded logic.
   - TableTestBase in core isn't actually visible from this test. So instead of mocking the file scan task, I create dummy table and extract the planned files instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1563: Close source data iterator for Spark data reader: fixing timeout due to S3A connection pool

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1563:
URL: https://github.com/apache/iceberg/pull/1563


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mickjermsurawong-stripe commented on a change in pull request #1563: Close source data iterator for Spark data reader: fixing timeout due to S3A connection pool

Posted by GitBox <gi...@apache.org>.
mickjermsurawong-stripe commented on a change in pull request #1563:
URL: https://github.com/apache/iceberg/pull/1563#discussion_r501366043



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.CloseableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public abstract class TestSparkBaseDataReader {
+
+  private static final Configuration CONF = new Configuration();
+
+  // Simulates the closeable iterator of data to be read
+  private static class CloseableIntegerRange implements CloseableIterator<Integer> {

Review comment:
       ah yup will use mockito here!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1563: Close source data iterator for Spark data reader: fixing timeout due to S3A connection pool

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1563:
URL: https://github.com/apache/iceberg/pull/1563#discussion_r501361848



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
##########
@@ -101,6 +101,7 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema
     File testFile = new File(dataFolder, format.addExtension(UUID.randomUUID().toString()));
 
     Table table = TestTables.create(location, desc, writeSchema, PartitionSpec.unpartitioned());
+

Review comment:
       Can you remove this? There is no need to modify this file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1563: Close source data iterator for Spark data reader: fixing timeout due to S3A connection pool

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1563:
URL: https://github.com/apache/iceberg/pull/1563#discussion_r501365481



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.CloseableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public abstract class TestSparkBaseDataReader {
+
+  private static final Configuration CONF = new Configuration();
+
+  // Simulates the closeable iterator of data to be read
+  private static class CloseableIntegerRange implements CloseableIterator<Integer> {

Review comment:
       I think this is a good test to have, but I'm a little concerned about the number of classes it takes.
   
   Have you considered an alternative structure? What about using a `spy` object that is injected the same way, in the `ClosureTrackingReader.open` method? Then you'd be able to use normal file splits and planning with `TableTestBase`, and you'd be able to keep track of all the spies in the reader you instantiate.
   
   I think that would get rid of a few of these static classes, which would make this easier to understand and maintain.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1563: Close source data iterator for Spark data reader: fixing timeout due to S3A connection pool

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1563:
URL: https://github.com/apache/iceberg/pull/1563#discussion_r501361356



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
##########
@@ -84,6 +84,7 @@ public boolean next() throws IOException {
         this.currentIterator.close();
         this.currentIterator = open(tasks.next());
       } else {
+        this.currentIterator.close();

Review comment:
       Looks correct to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mickjermsurawong-stripe commented on a change in pull request #1563: Close source data iterator for Spark data reader: fixing timeout due to S3A connection pool

Posted by GitBox <gi...@apache.org>.
mickjermsurawong-stripe commented on a change in pull request #1563:
URL: https://github.com/apache/iceberg/pull/1563#discussion_r501388147



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.CloseableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public abstract class TestSparkBaseDataReader {
+
+  private static final Configuration CONF = new Configuration();
+
+  // Simulates the closeable iterator of data to be read
+  private static class CloseableIntegerRange implements CloseableIterator<Integer> {

Review comment:
       - I retain 2 helper classes here. One is the reader to test, and the other is this `CloseableIterator`.
   I was hoping i could somehow open the `FileScanTask` to a spy of `CloseableIterator`, and I could check invocation on `close`. However, I don't seem to find any impl of `CloseableIterator` readily available here. Any existing opening of task to iterator would also invoke other unneeded logic. As a result, this class `CloseableIntegerRange` still remains.
   - TableTestBase in core isn't actually visible from this test. So instead of mocking the file scan task, I create dummy table and extract the planned files instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] mickjermsurawong-stripe commented on a change in pull request #1563: Close source data iterator for Spark data reader: fixing timeout due to S3A connection pool

Posted by GitBox <gi...@apache.org>.
mickjermsurawong-stripe commented on a change in pull request #1563:
URL: https://github.com/apache/iceberg/pull/1563#discussion_r501389158



##########
File path: spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.FileFormat.PARQUET;
+import static org.apache.iceberg.Files.localOutput;
+
+public abstract class TestSparkBaseDataReader {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private static final Configuration CONFD = new Configuration();
+
+  // Simulates the closeable iterator of data to be read
+  private static class CloseableIntegerRange implements CloseableIterator<Integer> {
+    boolean closed;
+    Iterator<Integer> iter;
+
+    CloseableIntegerRange(long range) {
+      this.closed = false;
+      this.iter = IntStream.range(0, (int) range).iterator();
+    }
+
+    @Override
+    public void close() {
+      this.closed = true;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext();
+    }
+
+    @Override
+    public Integer next() {
+      return iter.next();
+    }
+  }
+
+  // Main reader class to test base class iteration logic.
+  // Keeps track of iterator closure.
+  private static class ClosureTrackingReader extends BaseDataReader<Integer> {
+    private Map<String, CloseableIntegerRange> tracker = new HashMap<>();
+
+    ClosureTrackingReader(List<FileScanTask> tasks) {
+      super(new BaseCombinedScanTask(tasks),
+          new HadoopFileIO(CONFD),
+          new PlaintextEncryptionManager());
+    }
+
+    @Override
+    CloseableIterator<Integer> open(FileScanTask task) {
+      CloseableIntegerRange intRange = new CloseableIntegerRange(task.file().recordCount());
+      tracker.put(getKey(task), intRange);
+      return intRange;
+    }
+
+    public Boolean isIteratorClosed(FileScanTask task) {
+      return tracker.get(getKey(task)).closed;
+    }
+
+    public Boolean hasIterator(FileScanTask task) {
+      return tracker.containsKey(getKey(task));
+    }
+
+    private String getKey(FileScanTask task) {
+      return task.file().path().toString();
+    }
+  }
+
+  @Test
+  public void testClosureOnDataExhaustion() throws IOException {
+    Integer totalTasks = 10;
+    Integer recordPerTask = 10;
+    List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+    int countRecords = 0;
+    while (reader.next()) {
+      countRecords += 1;
+      Assert.assertNotNull("Reader should return non-null value", reader.get());
+    }
+
+    Assert.assertEquals("Reader returned incorrect number of records",
+        totalTasks * recordPerTask,
+        countRecords
+    );
+    tasks.forEach(t ->
+        Assert.assertTrue("All iterators should be closed after read exhausion",
+            reader.isIteratorClosed(t))
+    );
+  }
+
+  @Test
+  public void testClosureDuringIteration() throws IOException {
+    Integer totalTasks = 2;
+    Integer recordPerTask = 1;
+    List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+    Assert.assertEquals(2, tasks.size());
+    FileScanTask firstTask = tasks.get(0);
+    FileScanTask secondTask = tasks.get(1);
+
+    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+    // Total of 2 elements
+    Assert.assertTrue(reader.next());
+    Assert.assertFalse("First iter should not be closed on its last element",
+        reader.isIteratorClosed(firstTask));
+
+    Assert.assertTrue(reader.next());
+    Assert.assertTrue("First iter should be closed after moving to second iter",
+        reader.isIteratorClosed(firstTask));
+    Assert.assertFalse("Second iter should not be closed on its last element",
+        reader.isIteratorClosed(secondTask));
+
+    Assert.assertFalse(reader.next());
+    Assert.assertTrue(reader.isIteratorClosed(firstTask));
+    Assert.assertTrue(reader.isIteratorClosed(secondTask));
+  }
+
+  @Test
+  public void testClosureWithoutAnyRead() throws IOException {
+    Integer totalTasks = 10;
+    Integer recordPerTask = 10;
+    List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+    reader.close();
+
+    tasks.forEach(t ->
+        Assert.assertFalse("Iterator should not be created eagerly for tasks",
+            reader.hasIterator(t))
+    );
+  }
+
+  @Test
+  public void testExplicitClosure() throws IOException {
+    Integer totalTasks = 10;
+    Integer recordPerTask = 10;
+    List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+    Integer halfDataSize = (totalTasks * recordPerTask) / 2;
+    for (int i = 0; i < halfDataSize; i++) {
+      Assert.assertTrue("Reader should have some element", reader.next());
+      Assert.assertNotNull("Reader should return non-null value", reader.get());
+    }
+
+    reader.close();
+
+    // Some tasks might have not been opened yet, so we don't have corresponding tracker for it.
+    // But all that have been created must be closed.
+    tasks.forEach(t -> {
+      if (reader.hasIterator(t)) {
+        Assert.assertTrue("Iterator should be closed after read exhausion",
+            reader.isIteratorClosed(t));
+      }
+    });
+  }
+
+  @Test
+  public void testIdempotentExplicitClosure() throws IOException {
+    Integer totalTasks = 10;
+    Integer recordPerTask = 10;
+    List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
+
+    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+
+    // Total 100 elements, only 5 iterators have been created
+    for (int i = 0; i < 45; i++) {
+      Assert.assertTrue("eader should have some element", reader.next());
+      Assert.assertNotNull("Reader should return non-null value", reader.get());
+    }
+
+    for (int closeAttempt = 0; closeAttempt < 5; closeAttempt++) {
+      reader.close();
+      for (int i = 0; i < 5; i++) {
+        Assert.assertTrue("Iterator should be closed after read exhausion",
+            reader.isIteratorClosed(tasks.get(i)));
+      }
+      for (int i = 5; i < 10; i++) {
+        Assert.assertFalse("Iterator should not be created eagerly for tasks",
+            reader.hasIterator(tasks.get(i)));
+      }
+    }
+  }
+
+  private List<FileScanTask> createFileScanTasks(Integer totalTasks, Integer recordPerTask) throws IOException {

Review comment:
       This method relies on `planFiles` contract that each file corresponds to one scan task, to get desired number of total task argument




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1563: Close source data iterator for Spark data reader: fixing timeout due to S3A connection pool

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1563:
URL: https://github.com/apache/iceberg/pull/1563#issuecomment-706612264


   Thanks @mickjermsurawong-stripe! I've merged this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org