You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2020/04/24 23:53:18 UTC
[druid] branch 0.18.1 updated: fix issue where
CloseableIterator.flatMap does not close inner CloseableIterator (#9761)
(#9772)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.18.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.1 by this push:
new dc17de5 fix issue where CloseableIterator.flatMap does not close inner CloseableIterator (#9761) (#9772)
dc17de5 is described below
commit dc17de5d36269c210860d203826e813169ba0c93
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Apr 24 16:53:08 2020 -0700
fix issue where CloseableIterator.flatMap does not close inner CloseableIterator (#9761) (#9772)
* fix issue where CloseableIterator.flatMap does not close inner CloseableIterator
* more test
* style
* clarify test
---
.../util/common/parsers/CloseableIterator.java | 5 ++
.../util/common/parsers/CloseableIteratorTest.java | 89 +++++++++++++++++++++-
2 files changed, 91 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
index 45cda5c..d591536 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
@@ -75,6 +75,7 @@ public interface CloseableIterator<T> extends Iterator<T>, Closeable
if (iterator != null) {
try {
iterator.close();
+ iterator = null;
}
catch (IOException e) {
throw new UncheckedIOException(e);
@@ -112,6 +113,10 @@ public interface CloseableIterator<T> extends Iterator<T>, Closeable
public void close() throws IOException
{
delegate.close();
+ if (iterator != null) {
+ iterator.close();
+ iterator = null;
+ }
}
};
}
diff --git a/core/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java b/core/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
index 5434e2d..be2d1d5 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
@@ -23,6 +23,7 @@ import org.apache.druid.java.util.common.CloseableIterators;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -54,10 +55,18 @@ public class CloseableIteratorTest
}
@Test
- public void testFlatMap()
+ public void testFlatMap() throws IOException
{
- final CloseableIterator<Integer> actual = generateTestIterator(8)
- .flatMap(list -> CloseableIterators.withEmptyBaggage(list.iterator()));
+ List<CloseTrackingCloseableIterator<Integer>> innerIterators = new ArrayList<>();
+ final CloseTrackingCloseableIterator<Integer> actual = new CloseTrackingCloseableIterator<>(
+ generateTestIterator(8)
+ .flatMap(list -> {
+ CloseTrackingCloseableIterator<Integer> inner =
+ new CloseTrackingCloseableIterator<>(CloseableIterators.withEmptyBaggage(list.iterator()));
+ innerIterators.add(inner);
+ return inner;
+ })
+ );
final Iterator<Integer> expected = IntStream
.range(0, 8)
.flatMap(i -> IntStream.range(0, i))
@@ -67,6 +76,48 @@ public class CloseableIteratorTest
}
Assert.assertFalse(actual.hasNext());
Assert.assertFalse(expected.hasNext());
+ actual.close();
+ Assert.assertEquals(1, actual.closeCount);
+ for (CloseTrackingCloseableIterator iter : innerIterators) {
+ Assert.assertEquals(1, iter.closeCount);
+ }
+ }
+
+ @Test
+ public void testFlatMapClosedEarly() throws IOException
+ {
+ final int numIterations = 8;
+ List<CloseTrackingCloseableIterator<Integer>> innerIterators = new ArrayList<>();
+ final CloseTrackingCloseableIterator<Integer> actual = new CloseTrackingCloseableIterator<>(
+ generateTestIterator(numIterations)
+ .flatMap(list -> {
+ CloseTrackingCloseableIterator<Integer> inner =
+ new CloseTrackingCloseableIterator<>(CloseableIterators.withEmptyBaggage(list.iterator()));
+ innerIterators.add(inner);
+ return inner;
+ })
+ );
+ final Iterator<Integer> expected = IntStream
+ .range(0, numIterations)
+ .flatMap(i -> IntStream.range(0, i))
+ .iterator();
+
+ // burn through the first few iterators
+ int cnt = 0;
+ int numFlatIterations = 5;
+ while (expected.hasNext() && actual.hasNext() && cnt++ < numFlatIterations) {
+ Assert.assertEquals(expected.next(), actual.next());
+ }
+ // but stop while we still have an open current inner iterator and a few remaining inner iterators
+ Assert.assertTrue(actual.hasNext());
+ Assert.assertTrue(expected.hasNext());
+ Assert.assertEquals(4, innerIterators.size());
+ Assert.assertTrue(innerIterators.get(innerIterators.size() - 1).hasNext());
+ actual.close();
+ Assert.assertEquals(1, actual.closeCount);
+ for (CloseTrackingCloseableIterator iter : innerIterators) {
+ Assert.assertEquals(1, iter.closeCount);
+ }
}
private static CloseableIterator<List<Integer>> generateTestIterator(int numIterates)
@@ -99,4 +150,36 @@ public class CloseableIteratorTest
}
};
}
+
+ static class CloseTrackingCloseableIterator<T> implements CloseableIterator<T>
+ {
+ CloseableIterator<T> inner;
+ int closeCount;
+
+ public CloseTrackingCloseableIterator(CloseableIterator<T> toTrack)
+ {
+ this.inner = toTrack;
+ this.closeCount = 0;
+ }
+
+
+ @Override
+ public void close() throws IOException
+ {
+ inner.close();
+ closeCount++;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return inner.hasNext();
+ }
+
+ @Override
+ public T next()
+ {
+ return inner.next();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org