You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2020/04/24 23:53:18 UTC

[druid] branch 0.18.1 updated: fix issue where CloseableIterator.flatMap does not close inner CloseableIterator (#9761) (#9772)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.18.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.1 by this push:
     new dc17de5  fix issue where CloseableIterator.flatMap does not close inner CloseableIterator (#9761) (#9772)
dc17de5 is described below

commit dc17de5d36269c210860d203826e813169ba0c93
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Apr 24 16:53:08 2020 -0700

    fix issue where CloseableIterator.flatMap does not close inner CloseableIterator (#9761) (#9772)
    
    * fix issue where CloseableIterator.flatMap does not close inner CloseableIterator
    
    * more test
    
    * style
    
    * clarify test
---
 .../util/common/parsers/CloseableIterator.java     |  5 ++
 .../util/common/parsers/CloseableIteratorTest.java | 89 +++++++++++++++++++++-
 2 files changed, 91 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
index 45cda5c..d591536 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
@@ -75,6 +75,7 @@ public interface CloseableIterator<T> extends Iterator<T>, Closeable
           if (iterator != null) {
             try {
               iterator.close();
+              iterator = null;
             }
             catch (IOException e) {
               throw new UncheckedIOException(e);
@@ -112,6 +113,10 @@ public interface CloseableIterator<T> extends Iterator<T>, Closeable
       public void close() throws IOException
       {
         delegate.close();
+        if (iterator != null) {
+          iterator.close();
+          iterator = null;
+        }
       }
     };
   }
diff --git a/core/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java b/core/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
index 5434e2d..be2d1d5 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
@@ -23,6 +23,7 @@ import org.apache.druid.java.util.common.CloseableIterators;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -54,10 +55,18 @@ public class CloseableIteratorTest
   }
 
   @Test
-  public void testFlatMap()
+  public void testFlatMap() throws IOException
   {
-    final CloseableIterator<Integer> actual = generateTestIterator(8)
-        .flatMap(list -> CloseableIterators.withEmptyBaggage(list.iterator()));
+    List<CloseTrackingCloseableIterator<Integer>> innerIterators = new ArrayList<>();
+    final CloseTrackingCloseableIterator<Integer> actual = new CloseTrackingCloseableIterator<>(
+        generateTestIterator(8)
+            .flatMap(list -> {
+              CloseTrackingCloseableIterator<Integer> inner =
+                  new CloseTrackingCloseableIterator<>(CloseableIterators.withEmptyBaggage(list.iterator()));
+              innerIterators.add(inner);
+              return inner;
+            })
+    );
     final Iterator<Integer> expected = IntStream
         .range(0, 8)
         .flatMap(i -> IntStream.range(0, i))
@@ -67,6 +76,48 @@ public class CloseableIteratorTest
     }
     Assert.assertFalse(actual.hasNext());
     Assert.assertFalse(expected.hasNext());
+    actual.close();
+    Assert.assertEquals(1, actual.closeCount);
+    for (CloseTrackingCloseableIterator iter : innerIterators) {
+      Assert.assertEquals(1, iter.closeCount);
+    }
+  }
+
+  @Test
+  public void testFlatMapClosedEarly() throws IOException
+  {
+    final int numIterations = 8;
+    List<CloseTrackingCloseableIterator<Integer>> innerIterators = new ArrayList<>();
+    final CloseTrackingCloseableIterator<Integer> actual = new CloseTrackingCloseableIterator<>(
+        generateTestIterator(numIterations)
+            .flatMap(list -> {
+              CloseTrackingCloseableIterator<Integer> inner =
+                  new CloseTrackingCloseableIterator<>(CloseableIterators.withEmptyBaggage(list.iterator()));
+              innerIterators.add(inner);
+              return inner;
+            })
+    );
+    final Iterator<Integer> expected = IntStream
+        .range(0, numIterations)
+        .flatMap(i -> IntStream.range(0, i))
+        .iterator();
+
+    // burn through the first few iterators
+    int cnt = 0;
+    int numFlatIterations = 5;
+    while (expected.hasNext() && actual.hasNext() && cnt++ < numFlatIterations) {
+      Assert.assertEquals(expected.next(), actual.next());
+    }
+    // but stop while we still have an open current inner iterator and a few remaining inner iterators
+    Assert.assertTrue(actual.hasNext());
+    Assert.assertTrue(expected.hasNext());
+    Assert.assertEquals(4, innerIterators.size());
+    Assert.assertTrue(innerIterators.get(innerIterators.size() - 1).hasNext());
+    actual.close();
+    Assert.assertEquals(1, actual.closeCount);
+    for (CloseTrackingCloseableIterator iter : innerIterators) {
+      Assert.assertEquals(1, iter.closeCount);
+    }
   }
 
   private static CloseableIterator<List<Integer>> generateTestIterator(int numIterates)
@@ -99,4 +150,36 @@ public class CloseableIteratorTest
       }
     };
   }
+
+  static class CloseTrackingCloseableIterator<T> implements CloseableIterator<T>
+  {
+    CloseableIterator<T> inner;
+    int closeCount;
+
+    public CloseTrackingCloseableIterator(CloseableIterator<T> toTrack)
+    {
+      this.inner = toTrack;
+      this.closeCount = 0;
+    }
+
+
+    @Override
+    public void close() throws IOException
+    {
+      inner.close();
+      closeCount++;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return inner.hasNext();
+    }
+
+    @Override
+    public T next()
+    {
+      return inner.next();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org