You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/02/27 13:59:39 UTC
incubator-metron git commit: METRON-728: ReaderSpliteratorTest fails
randomly and extremely rarely closes apache/incubator-metron#463
Repository: incubator-metron
Updated Branches:
refs/heads/master 898d2360c -> 1097f3613
METRON-728: ReaderSpliteratorTest fails randomly and extremely rarely closes apache/incubator-metron#463
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/1097f361
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/1097f361
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/1097f361
Branch: refs/heads/master
Commit: 1097f361309daf29db4958296cd38e4a7ffc407d
Parents: 898d236
Author: cstella <ce...@gmail.com>
Authored: Mon Feb 27 08:59:31 2017 -0500
Committer: cstella <ce...@gmail.com>
Committed: Mon Feb 27 08:59:31 2017 -0500
----------------------------------------------------------------------
.../utils/file/ReaderSpliteratorTest.java | 144 +++++++++----------
1 file changed, 65 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1097f361/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
index 965840f..eab0ee5 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
@@ -28,10 +28,23 @@ import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.Map;
+import java.util.Spliterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
public class ReaderSpliteratorTest {
/**
@@ -58,8 +71,20 @@ public class ReaderSpliteratorTest {
dataFile.deleteOnExit();
}
- public static BufferedReader getReader() throws FileNotFoundException {
- return new BufferedReader(new FileReader(dataFile));
+ public static BufferedReader getReader() {
+ try {
+ return new BufferedReader(new FileReader(dataFile));
+ } catch (FileNotFoundException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ private static void validateMapCount(Map<String, Integer> count) {
+ Assert.assertEquals(5, count.size());
+ Assert.assertEquals(3, (int)count.get("foo"));
+ Assert.assertEquals(2, (int)count.get("bar"));
+ Assert.assertEquals(1, (int)count.get("and"));
+ Assert.assertEquals(1, (int)count.get("the"));
}
@Test
@@ -69,11 +94,7 @@ public class ReaderSpliteratorTest {
Map<String, Integer> count =
stream.parallel().map( s -> s.trim())
.collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
- Assert.assertEquals(5, count.size());
- Assert.assertEquals(3, (int)count.get("foo"));
- Assert.assertEquals(2, (int)count.get("bar"));
- Assert.assertEquals(1, (int)count.get("and"));
- Assert.assertEquals(1, (int)count.get("the"));
+ validateMapCount(count);
}
}
@@ -83,11 +104,7 @@ public class ReaderSpliteratorTest {
Map<String, Integer> count =
stream.parallel().map(s -> s.trim())
.collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
- Assert.assertEquals(5, count.size());
- Assert.assertEquals(3, (int) count.get("foo"));
- Assert.assertEquals(2, (int) count.get("bar"));
- Assert.assertEquals(1, (int) count.get("and"));
- Assert.assertEquals(1, (int) count.get("the"));
+ validateMapCount(count);
}
}
@@ -97,88 +114,57 @@ public class ReaderSpliteratorTest {
Map<String, Integer> count =
stream.map(s -> s.trim())
.collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
- Assert.assertEquals(5, count.size());
- Assert.assertEquals(3, (int) count.get("foo"));
- Assert.assertEquals(2, (int) count.get("bar"));
- Assert.assertEquals(1, (int) count.get("and"));
- Assert.assertEquals(1, (int) count.get("the"));
+ validateMapCount(count);
}
}
- @Test
- public void testActuallyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
- //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most min(5, 2) = 2 threads will be used
- try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) {
- ForkJoinPool forkJoinPool = new ForkJoinPool(2);
- forkJoinPool.submit(() -> {
- Map<String, Integer> threads =
- stream.parallel().map(s -> Thread.currentThread().getName())
- .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
- Assert.assertTrue(threads.size() <= 2);
- }
- ).get();
- }
- }
+ private int getNumberOfBatches(final ReaderSpliterator spliterator) throws ExecutionException, InterruptedException {
+ final AtomicInteger numSplits = new AtomicInteger(0);
+ //we want to wrap the spliterator and count the (valid) splits
+ Spliterator<String> delegatingSpliterator = spy(spliterator);
+ doAnswer(invocationOnMock -> {
+ Spliterator<String> ret = spliterator.trySplit();
+ if(ret != null) {
+ numSplits.incrementAndGet();
+ }
+ return ret;
+ }).when(delegatingSpliterator).trySplit();
- @Test
- public void testActuallyParallel_mediumBatch() throws ExecutionException, InterruptedException, FileNotFoundException {
- //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used
- try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) {
- ForkJoinPool forkJoinPool = new ForkJoinPool(10);
- forkJoinPool.submit(() -> {
- Map<String, Integer> threads =
- stream.parallel().map(s -> Thread.currentThread().getName())
- .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
- Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1);
- }
- ).get();
- }
+ Stream<String> stream = StreamSupport.stream(delegatingSpliterator, true);
+
+ //now run it in a parallel pool and do some calculation that doesn't really matter.
+ ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
+ forkJoinPool.submit(() -> {
+ Map<String, Integer> threads =
+ stream.parallel().map(s -> Thread.currentThread().getName())
+ .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+ Assert.assertTrue(threads.size() > 0);
+ }
+ ).get();
+ return numSplits.get();
}
@Test
- public void testActuallyParallel_mediumBatchNotImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
- //Since this is not parallel and we're not making the stream itself parallel, we should only use one thread from the thread pool.
- try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2, false)) {
- ForkJoinPool forkJoinPool = new ForkJoinPool(10);
- forkJoinPool.submit(() -> {
- Map<String, Integer> threads =
- stream.map(s -> Thread.currentThread().getName())
- .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
- Assert.assertTrue(threads.size() == 1);
- }
- ).get();
+ public void testSmallBatch() throws ExecutionException, InterruptedException, IOException {
+ //With 9 elements and a batch of 1, we should have ceil(9/1) = 9 batches
+ try(BufferedReader reader = getReader()) {
+ Assert.assertEquals(9, getNumberOfBatches(new ReaderSpliterator(reader, 1)));
}
}
@Test
- public void testActuallyParallel_mediumBatchImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
- //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used
- //despite not calling .parallel() on the stream, we are constructing the stream to be implicitly parallel
- try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2, true)) {
- ForkJoinPool forkJoinPool = new ForkJoinPool(10);
- forkJoinPool.submit(() -> {
- Map<String, Integer> threads =
- stream.map(s -> Thread.currentThread().getName())
- .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
- Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1);
- }
- ).get();
+ public void testMediumBatch() throws ExecutionException, InterruptedException, IOException {
+ //With 9 elements and a batch of 2, we should have ceil(9/2) = 5 batches
+ try(BufferedReader reader = getReader()) {
+ Assert.assertEquals(5, getNumberOfBatches(new ReaderSpliterator(reader, 2)));
}
}
@Test
- public void testActuallyParallel_bigBatch() throws ExecutionException, InterruptedException, FileNotFoundException {
- //With 9 elements and a batch of 10, we should only have one batch, so only one thread will be used
- //despite the thread pool size of 2.
- try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 10)) {
- ForkJoinPool forkJoinPool = new ForkJoinPool(2);
- forkJoinPool.submit(() -> {
- Map<String, Integer> threads =
- stream.parallel().map(s -> Thread.currentThread().getName())
- .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
- Assert.assertEquals(1, threads.size());
- }
- ).get();
+ public void testOneBigBatch() throws ExecutionException, InterruptedException, IOException {
+ //With 9 elements and a batch of 10, we should only have one batch
+ try(BufferedReader reader = getReader()) {
+ Assert.assertEquals(1, getNumberOfBatches(new ReaderSpliterator(reader, 10)));
}
}