You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/11/23 12:46:13 UTC
[camel] branch camel-2.20.x updated: CAMEL-12025: Possible
Intermittent failures in ReactorStreamsServiceTest
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.20.x by this push:
new 647c4fd CAMEL-12025: Possible Intermittent failures in ReactorStreamsServiceTest
647c4fd is described below
commit 647c4fd6006b8c815dc932ecd65473fc90f809ba
Author: Peter Palaga <pp...@redhat.com>
AuthorDate: Thu Nov 23 09:50:32 2017 +0100
CAMEL-12025: Possible Intermittent failures in ReactorStreamsServiceTest
---
.../reactor/engine/ReactorStreamsServiceTest.java | 34 +++++++++++++---------
1 file changed, 21 insertions(+), 13 deletions(-)
diff --git a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
index 19a1f5c..9b9df6b 100644
--- a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
+++ b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.reactor.engine;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,7 +39,7 @@ import reactor.core.Disposable;
import reactor.core.publisher.Flux;
public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport {
-
+
// ************************************************
// Setup
// ************************************************
@@ -287,70 +291,74 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
public void testTo() throws Exception {
context.start();
- AtomicInteger value = new AtomicInteger(0);
- CountDownLatch latch = new CountDownLatch(1);
+ Set<String> values = Collections.synchronizedSet(new TreeSet<>());
+ CountDownLatch latch = new CountDownLatch(3);
Flux.just(1, 2, 3)
.flatMap(e -> crs.to("bean:hello", e, String.class))
- .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res))
+ .doOnNext(res -> values.add(res))
.doOnNext(res -> latch.countDown())
.subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
+ Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
}
@Test
public void testToWithExchange() throws Exception {
context.start();
- AtomicInteger value = new AtomicInteger(0);
- CountDownLatch latch = new CountDownLatch(1);
+ Set<String> values = Collections.synchronizedSet(new TreeSet<>());
+ CountDownLatch latch = new CountDownLatch(3);
Flux.just(1, 2, 3)
.flatMap(e -> crs.to("bean:hello", e))
.map(e -> e.getOut())
.map(e -> e.getBody(String.class))
- .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res))
+ .doOnNext(res -> values.add(res))
.doOnNext(res -> latch.countDown())
.subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
+ Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
}
@Test
public void testToFunction() throws Exception {
context.start();
- AtomicInteger value = new AtomicInteger(0);
- CountDownLatch latch = new CountDownLatch(1);
+ Set<String> values = Collections.synchronizedSet(new TreeSet<>());
+ CountDownLatch latch = new CountDownLatch(3);
Function<Object, Publisher<String>> fun = crs.to("bean:hello", String.class);
Flux.just(1, 2, 3)
.flatMap(fun)
- .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res))
+ .doOnNext(res -> values.add(res))
.doOnNext(res -> latch.countDown())
.subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
+ Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
}
@Test
public void testToFunctionWithExchange() throws Exception {
context.start();
- AtomicInteger value = new AtomicInteger(0);
- CountDownLatch latch = new CountDownLatch(1);
+ Set<String> values = Collections.synchronizedSet(new TreeSet<>());
+ CountDownLatch latch = new CountDownLatch(3);
Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello");
Flux.just(1, 2, 3)
.flatMap(fun)
.map(e -> e.getOut())
.map(e -> e.getBody(String.class))
- .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res))
+ .doOnNext(res -> values.add(res))
.doOnNext(res -> latch.countDown())
.subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
+ Assert.assertEquals(new TreeSet<>(Arrays.asList("Hello 1", "Hello 2", "Hello 3")), values);
}
// ************************************************
--
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].