You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/11/23 12:46:13 UTC

[camel] branch camel-2.20.x updated: CAMEL-12025: Possible Intermittent failures in ReactorStreamsServiceTest

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.20.x by this push:
     new 647c4fd  CAMEL-12025: Possible Intermittent failures in ReactorStreamsServiceTest
647c4fd is described below

commit 647c4fd6006b8c815dc932ecd65473fc90f809ba
Author: Peter Palaga <pp...@redhat.com>
AuthorDate: Thu Nov 23 09:50:32 2017 +0100

    CAMEL-12025: Possible Intermittent failures in ReactorStreamsServiceTest
---
 .../reactor/engine/ReactorStreamsServiceTest.java  | 34 +++++++++++++---------
 1 file changed, 21 insertions(+), 13 deletions(-)

diff --git a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
index 19a1f5c..9b9df6b 100644
--- a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
+++ b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.component.reactor.engine;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -35,7 +39,7 @@ import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 
 public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport {
-    
+
     // ************************************************
     // Setup
     // ************************************************
@@ -287,70 +291,74 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
     public void testTo() throws Exception {
         context.start();
 
-        AtomicInteger value = new AtomicInteger(0);
-        CountDownLatch latch = new CountDownLatch(1);
+        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
+        CountDownLatch latch = new CountDownLatch(3);
 
         Flux.just(1, 2, 3)
             .flatMap(e -> crs.to("bean:hello", e, String.class))
-            .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res))
+            .doOnNext(res -> values.add(res))
             .doOnNext(res -> latch.countDown())
             .subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
+        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
     }
 
     @Test
     public void testToWithExchange() throws Exception {
         context.start();
 
-        AtomicInteger value = new AtomicInteger(0);
-        CountDownLatch latch = new CountDownLatch(1);
+        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
+        CountDownLatch latch = new CountDownLatch(3);
 
         Flux.just(1, 2, 3)
             .flatMap(e -> crs.to("bean:hello", e))
             .map(e -> e.getOut())
             .map(e -> e.getBody(String.class))
-            .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res))
+            .doOnNext(res -> values.add(res))
             .doOnNext(res -> latch.countDown())
             .subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
+        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
     }
 
     @Test
     public void testToFunction() throws Exception {
         context.start();
 
-        AtomicInteger value = new AtomicInteger(0);
-        CountDownLatch latch = new CountDownLatch(1);
+        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
+        CountDownLatch latch = new CountDownLatch(3);
         Function<Object, Publisher<String>> fun = crs.to("bean:hello", String.class);
 
         Flux.just(1, 2, 3)
             .flatMap(fun)
-            .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res))
+            .doOnNext(res -> values.add(res))
             .doOnNext(res -> latch.countDown())
             .subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
+        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
     }
 
     @Test
     public void testToFunctionWithExchange() throws Exception {
         context.start();
 
-        AtomicInteger value = new AtomicInteger(0);
-        CountDownLatch latch = new CountDownLatch(1);
+        Set<String> values = Collections.synchronizedSet(new TreeSet<>());
+        CountDownLatch latch = new CountDownLatch(3);
         Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello");
 
         Flux.just(1, 2, 3)
             .flatMap(fun)
             .map(e -> e.getOut())
             .map(e -> e.getBody(String.class))
-            .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res))
+            .doOnNext(res -> values.add(res))
             .doOnNext(res -> latch.countDown())
             .subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
+        Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
     }
 
     // ************************************************

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].