You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/02/27 13:59:39 UTC

incubator-metron git commit: METRON-728: ReaderSpliteratorTest fails randomly and extremely rarely closes apache/incubator-metron#463

Repository: incubator-metron
Updated Branches:
  refs/heads/master 898d2360c -> 1097f3613


METRON-728: ReaderSpliteratorTest fails randomly and extremely rarely closes apache/incubator-metron#463


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/1097f361
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/1097f361
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/1097f361

Branch: refs/heads/master
Commit: 1097f361309daf29db4958296cd38e4a7ffc407d
Parents: 898d236
Author: cstella <ce...@gmail.com>
Authored: Mon Feb 27 08:59:31 2017 -0500
Committer: cstella <ce...@gmail.com>
Committed: Mon Feb 27 08:59:31 2017 -0500

----------------------------------------------------------------------
 .../utils/file/ReaderSpliteratorTest.java       | 144 +++++++++----------
 1 file changed, 65 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1097f361/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
index 965840f..eab0ee5 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
@@ -28,10 +28,23 @@ import java.nio.file.Files;
 import java.nio.file.OpenOption;
 import java.nio.file.StandardOpenOption;
 import java.util.Map;
+import java.util.Spliterator;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 public class ReaderSpliteratorTest {
   /**
@@ -58,8 +71,20 @@ public class ReaderSpliteratorTest {
     dataFile.deleteOnExit();
   }
 
-  public static BufferedReader getReader() throws FileNotFoundException {
-    return new BufferedReader(new FileReader(dataFile));
+  public static BufferedReader getReader() {
+    try {
+      return new BufferedReader(new FileReader(dataFile));
+    } catch (FileNotFoundException e) {
+      throw new IllegalStateException(e.getMessage(), e);
+    }
+  }
+
+  private static void validateMapCount(Map<String, Integer> count) {
+    Assert.assertEquals(5, count.size());
+    Assert.assertEquals(3, (int)count.get("foo"));
+    Assert.assertEquals(2, (int)count.get("bar"));
+    Assert.assertEquals(1, (int)count.get("and"));
+    Assert.assertEquals(1, (int)count.get("the"));
   }
 
   @Test
@@ -69,11 +94,7 @@ public class ReaderSpliteratorTest {
       Map<String, Integer> count =
               stream.parallel().map( s -> s.trim())
                       .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
-      Assert.assertEquals(5, count.size());
-      Assert.assertEquals(3, (int)count.get("foo"));
-      Assert.assertEquals(2, (int)count.get("bar"));
-      Assert.assertEquals(1, (int)count.get("and"));
-      Assert.assertEquals(1, (int)count.get("the"));
+      validateMapCount(count);
     }
   }
 
@@ -83,11 +104,7 @@ public class ReaderSpliteratorTest {
       Map<String, Integer> count =
               stream.parallel().map(s -> s.trim())
                       .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
-      Assert.assertEquals(5, count.size());
-      Assert.assertEquals(3, (int) count.get("foo"));
-      Assert.assertEquals(2, (int) count.get("bar"));
-      Assert.assertEquals(1, (int) count.get("and"));
-      Assert.assertEquals(1, (int) count.get("the"));
+      validateMapCount(count);
     }
   }
 
@@ -97,88 +114,57 @@ public class ReaderSpliteratorTest {
       Map<String, Integer> count =
               stream.map(s -> s.trim())
                       .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
-      Assert.assertEquals(5, count.size());
-      Assert.assertEquals(3, (int) count.get("foo"));
-      Assert.assertEquals(2, (int) count.get("bar"));
-      Assert.assertEquals(1, (int) count.get("and"));
-      Assert.assertEquals(1, (int) count.get("the"));
+      validateMapCount(count);
     }
   }
 
-  @Test
-  public void testActuallyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
-    //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most min(5, 2) = 2 threads will be used
-    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) {
-      ForkJoinPool forkJoinPool = new ForkJoinPool(2);
-      forkJoinPool.submit(() -> {
-                Map<String, Integer> threads =
-                        stream.parallel().map(s -> Thread.currentThread().getName())
-                                .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
-                Assert.assertTrue(threads.size() <= 2);
-              }
-      ).get();
-    }
-  }
+  private int getNumberOfBatches(final ReaderSpliterator spliterator) throws ExecutionException, InterruptedException {
+    final AtomicInteger numSplits = new AtomicInteger(0);
+    //we want to wrap the spliterator and count the (valid) splits
+    Spliterator<String> delegatingSpliterator = spy(spliterator);
+    doAnswer(invocationOnMock -> {
+      Spliterator<String> ret = spliterator.trySplit();
+      if(ret != null) {
+        numSplits.incrementAndGet();
+      }
+      return ret;
+    }).when(delegatingSpliterator).trySplit();
 
-  @Test
-  public void testActuallyParallel_mediumBatch() throws ExecutionException, InterruptedException, FileNotFoundException {
-    //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used
-    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) {
-      ForkJoinPool forkJoinPool = new ForkJoinPool(10);
-      forkJoinPool.submit(() -> {
-                Map<String, Integer> threads =
-                        stream.parallel().map(s -> Thread.currentThread().getName())
-                                .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
-                Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1);
-              }
-      ).get();
-    }
+    Stream<String> stream = StreamSupport.stream(delegatingSpliterator, true);
+
+    //now run it in a parallel pool and do some calculation that doesn't really matter.
+    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
+    forkJoinPool.submit(() -> {
+                    Map<String, Integer> threads =
+                      stream.parallel().map(s -> Thread.currentThread().getName())
+                              .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+                    Assert.assertTrue(threads.size() > 0);
+            }
+    ).get();
+    return numSplits.get();
   }
 
   @Test
-  public void testActuallyParallel_mediumBatchNotImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
-    //Since this is not parallel and we're not making the stream itself parallel, we should only use one thread from the thread pool.
-    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2, false)) {
-      ForkJoinPool forkJoinPool = new ForkJoinPool(10);
-      forkJoinPool.submit(() -> {
-                Map<String, Integer> threads =
-                        stream.map(s -> Thread.currentThread().getName())
-                                .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
-                Assert.assertTrue(threads.size() == 1);
-              }
-      ).get();
+  public void testSmallBatch() throws ExecutionException, InterruptedException, IOException {
+    //With 9 elements and a batch of 1, we should have ceil(9/1) = 9 batches
+    try(BufferedReader reader = getReader()) {
+      Assert.assertEquals(9, getNumberOfBatches(new ReaderSpliterator(reader, 1)));
     }
   }
 
   @Test
-  public void testActuallyParallel_mediumBatchImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
-    //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used
-    //despite not calling .parallel() on the stream, we are constructing the stream to be implicitly parallel
-    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2, true)) {
-      ForkJoinPool forkJoinPool = new ForkJoinPool(10);
-      forkJoinPool.submit(() -> {
-                Map<String, Integer> threads =
-                        stream.map(s -> Thread.currentThread().getName())
-                                .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
-                Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1);
-              }
-      ).get();
+  public void testMediumBatch() throws ExecutionException, InterruptedException, IOException {
+    //With 9 elements and a batch of 2, we should have ceil(9/2) = 5 batches
+    try(BufferedReader reader = getReader()) {
+      Assert.assertEquals(5, getNumberOfBatches(new ReaderSpliterator(reader, 2)));
     }
   }
 
   @Test
-  public void testActuallyParallel_bigBatch() throws ExecutionException, InterruptedException, FileNotFoundException {
-    //With 9 elements and a batch of 10, we should only have one batch, so only one thread will be used
-    //despite the thread pool size of 2.
-    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 10)) {
-      ForkJoinPool forkJoinPool = new ForkJoinPool(2);
-      forkJoinPool.submit(() -> {
-                Map<String, Integer> threads =
-                        stream.parallel().map(s -> Thread.currentThread().getName())
-                                .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
-                Assert.assertEquals(1, threads.size());
-              }
-      ).get();
+  public void testOneBigBatch() throws ExecutionException, InterruptedException, IOException {
+    //With 9 elements and a batch of 10, we should only have one batch
+    try(BufferedReader reader = getReader()) {
+      Assert.assertEquals(1, getNumberOfBatches(new ReaderSpliterator(reader, 10)));
     }
   }