You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/07/11 16:25:07 UTC
[camel] branch master updated: camel-reactive-stream: cleanup
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new b8d6fae camel-reactive-stream: cleanup
b8d6fae is described below
commit b8d6faeb09ffe6ba9f5c02d930833bfecc89bc0a
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Fri Jul 10 17:56:50 2020 +0200
camel-reactive-stream: cleanup
---
.../reactive/streams/ReactiveStreamsCamelSubscriber.java | 4 ++--
.../component/reactive/streams/ReactiveStreamsHelper.java | 1 -
.../reactive/streams/api/CamelReactiveStreamsService.java | 3 ++-
.../component/reactive/streams/engine/CamelPublisher.java | 8 ++++----
.../reactive/streams/engine/CamelSubscription.java | 12 ++++++------
.../streams/engine/DefaultCamelReactiveStreamsService.java | 5 +++++
.../reactive/streams/engine/DelayedMonoPublisher.java | 12 ++++++------
.../reactive/streams/engine/UnwrappingPublisher.java | 2 +-
.../component/reactive/streams/util/BodyConverter.java | 11 ++---------
.../reactive/streams/util/ConvertingPublisher.java | 6 +++---
.../reactive/streams/util/ConvertingSubscriber.java | 6 +++---
.../component/reactive/streams/util/MonoPublisher.java | 2 +-
.../streams/support/ReactiveStreamsTestService.java | 6 ++++++
.../component/reactor/engine/ReactorStreamsService.java | 5 +++++
.../engine/ReactorStreamsServiceBackpressureTest.java | 4 ++--
.../reactor/engine/ReactorStreamsServiceTest.java | 14 +++++++-------
.../component/rxjava/engine/RxJavaCamelProcessor.java | 2 +-
.../component/rxjava/engine/RxJavaStreamsService.java | 5 +++++
.../component/rxjava/engine/RxJavaStreamsServiceTest.java | 10 ++++++----
19 files changed, 67 insertions(+), 51 deletions(-)
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
index 0d0ffe7..875df93 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
@@ -37,12 +37,12 @@ public class ReactiveStreamsCamelSubscriber implements Subscriber<Exchange>, Clo
*/
private static final long UNBOUNDED_REQUESTS = Long.MAX_VALUE;
+ private final String name;
+
private ReactiveStreamsConsumer consumer;
private Subscription subscription;
- private String name;
-
private long requested;
private long inflightCount;
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java
index a380934..a9752c4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java
@@ -110,7 +110,6 @@ public final class ReactiveStreamsHelper {
}
}
- @SuppressWarnings("unchecked")
public static CamelReactiveStreamsServiceFactory resolveServiceFactory(CamelContext context, String serviceType) {
try {
FactoryFinder finder = context.adapt(ExtendedCamelContext.class).getFactoryFinder(ReactiveStreamsConstants.SERVICE_PATH);
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index baea25f..c0d31f0 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -23,6 +23,7 @@ import org.apache.camel.Service;
import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
+import org.apache.camel.spi.HasCamelContext;
import org.apache.camel.spi.HasId;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@@ -30,7 +31,7 @@ import org.reactivestreams.Subscriber;
/**
* The interface to which any implementation of the reactive-streams engine should comply.
*/
-public interface CamelReactiveStreamsService extends Service, HasId {
+public interface CamelReactiveStreamsService extends Service, HasId, HasCamelContext {
/*
* Main API methods.
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index 445cdd7..28327cf 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -45,13 +45,13 @@ public class CamelPublisher implements Publisher<Exchange>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CamelPublisher.class);
- private ExecutorService workerPool;
+ private final ExecutorService workerPool;
- private String name;
+ private final String name;
- private ReactiveStreamsBackpressureStrategy backpressureStrategy;
+ private final List<CamelSubscription> subscriptions = new CopyOnWriteArrayList<>();
- private List<CamelSubscription> subscriptions = new CopyOnWriteArrayList<>();
+ private ReactiveStreamsBackpressureStrategy backpressureStrategy;
private ReactiveStreamsProducer producer;
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index ff60d4f..3b9f0e9 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -43,17 +43,17 @@ public class CamelSubscription implements Subscription {
private static final Logger LOG = LoggerFactory.getLogger(CamelSubscription.class);
- private String id;
+ private final String id;
- private ExecutorService workerPool;
+ private final ExecutorService workerPool;
- private String streamName;
+ private final String streamName;
- private CamelPublisher publisher;
+ private final CamelPublisher publisher;
- private ReactiveStreamsBackpressureStrategy backpressureStrategy;
+ private final Subscriber<? super Exchange> subscriber;
- private Subscriber<? super Exchange> subscriber;
+ private ReactiveStreamsBackpressureStrategy backpressureStrategy;
/**
* The lock is used just for the time necessary to read/write shared variables.
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
index b34cd76..a9238e4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
@@ -88,6 +88,11 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement
}
@Override
+ public CamelContext getCamelContext() {
+ return context;
+ }
+
+ @Override
protected void doInit() {
if (this.workerPool == null) {
this.workerPool = context.getExecutorServiceManager().newThreadPool(
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
index 74a937c..b929166 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
@@ -36,15 +36,15 @@ public class DelayedMonoPublisher<T> implements Publisher<T> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedMonoPublisher.class);
- private ExecutorService workerPool;
+ private final ExecutorService workerPool;
- private volatile T data;
+ private final List<MonoSubscription> subscriptions = new CopyOnWriteArrayList<>();
- private volatile Throwable exception;
+ private final AtomicBoolean flushing = new AtomicBoolean(false);
- private List<MonoSubscription> subscriptions = new CopyOnWriteArrayList<>();
+ private volatile T data;
- private AtomicBoolean flushing = new AtomicBoolean(false);
+ private volatile Throwable exception;
public DelayedMonoPublisher(ExecutorService workerPool) {
this.workerPool = workerPool;
@@ -130,7 +130,7 @@ public class DelayedMonoPublisher<T> implements Publisher<T> {
private volatile boolean requested;
- private Subscriber<? super T> subscriber;
+ private final Subscriber<? super T> subscriber;
private MonoSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
index 30dac80..6910c90 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
@@ -30,7 +30,7 @@ import org.reactivestreams.Subscription;
* It calls the dispatch callback if defined.
*/
public class UnwrappingPublisher implements Publisher<Exchange> {
- private Publisher<Exchange> delegate;
+ private final Publisher<Exchange> delegate;
public UnwrappingPublisher(Publisher<Exchange> delegate) {
Objects.requireNonNull(delegate, "delegate publisher cannot be null");
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java
index 2f11562..5dc7bd8 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java
@@ -33,17 +33,10 @@ public final class BodyConverter<T> implements Function<Exchange, T> {
@Override
public T apply(Exchange exchange) {
- T answer;
-
- if (exchange.hasOut()) {
- answer = exchange.getOut().getBody(type);
- } else {
- answer = exchange.getIn().getBody(type);
- }
-
- return answer;
+ return exchange.getMessage().getBody(type);
}
+ @SuppressWarnings("unchecked")
public static <C> BodyConverter<C> forType(Class<C> type) {
return BodyConverter.class.cast(
CACHE.computeIfAbsent(type, BodyConverter::new)
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
index 1e94b13..154b08a 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
@@ -34,10 +34,10 @@ public class ConvertingPublisher<R> implements Publisher<R> {
private static final Logger LOG = LoggerFactory.getLogger(ConvertingPublisher.class);
- private Publisher<Exchange> delegate;
+ private final Publisher<Exchange> delegate;
- private Class<R> type;
- private BodyConverter<R> converter;
+ private final Class<R> type;
+ private final BodyConverter<R> converter;
public ConvertingPublisher(Publisher<Exchange> delegate, Class<R> type) {
Objects.requireNonNull(delegate, "delegate publisher cannot be null");
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
index 90e830a..bd25706 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
@@ -29,11 +29,11 @@ import org.reactivestreams.Subscription;
*/
public class ConvertingSubscriber<R> implements Subscriber<R> {
- private Class<R> type;
+ private final Class<R> type;
- private Subscriber<Exchange> delegate;
+ private final Subscriber<Exchange> delegate;
- private CamelContext context;
+ private final CamelContext context;
public ConvertingSubscriber(Subscriber<Exchange> delegate, CamelContext context, Class<R> type) {
Objects.requireNonNull(delegate, "delegate subscriber cannot be null");
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
index 4775232..3040889 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
@@ -27,7 +27,7 @@ import org.reactivestreams.Subscription;
*/
public class MonoPublisher<T> implements Publisher<T> {
- private T item;
+ private final T item;
public MonoPublisher(T item) {
this.item = item;
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 65e54e2..eaf72e4 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.reactive.streams.support;
import java.util.function.Function;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
@@ -38,6 +39,11 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
}
@Override
+ public CamelContext getCamelContext() {
+ return null;
+ }
+
+ @Override
public void start() {
}
diff --git a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
index 7640274..4c76e9c 100644
--- a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
+++ b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
@@ -64,6 +64,11 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv
return ReactorStreamsConstants.SERVICE_NAME;
}
+ @Override
+ public CamelContext getCamelContext() {
+ return context;
+ }
+
// ******************************************
// Lifecycle
// ******************************************
diff --git a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
index f0b9c2b..ad0e128 100644
--- a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
+++ b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
@@ -108,7 +108,7 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService
Thread.sleep(200); // add other time to ensure no other items arrive
Assert.assertEquals(2, queue.size());
- int sum = queue.stream().reduce((i, j) -> i + j).get();
+ int sum = queue.stream().reduce(Integer::sum).get();
Assert.assertEquals(3, sum); // 1 + 2 = 3
subscriber.cancel();
@@ -158,7 +158,7 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService
// Assert.assertEquals(2, queue.size());
Assert.assertEquals(3, queue.size());
- int sum = queue.stream().reduce((i, j) -> i + j).get();
+ int sum = queue.stream().reduce(Integer::sum).get();
// Assert.assertEquals(21, sum); // 1 + 20 = 21
Assert.assertEquals(23, sum); // 1 + 2 + 20 = 23
diff --git a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
index 690c578..6ef2a2f 100644
--- a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
+++ b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
@@ -291,7 +291,7 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
Flux.just(1, 2, 3)
.flatMap(e -> crs.to("bean:hello", e, String.class))
- .doOnNext(res -> values.add(res))
+ .doOnNext(values::add)
.doOnNext(res -> latch.countDown())
.subscribe();
@@ -308,9 +308,9 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
Flux.just(1, 2, 3)
.flatMap(e -> crs.to("bean:hello", e))
- .map(e -> e.getMessage())
+ .map(Exchange::getMessage)
.map(e -> e.getBody(String.class))
- .doOnNext(res -> values.add(res))
+ .doOnNext(values::add)
.doOnNext(res -> latch.countDown())
.subscribe();
@@ -328,7 +328,7 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
Flux.just(1, 2, 3)
.flatMap(fun)
- .doOnNext(res -> values.add(res))
+ .doOnNext(values::add)
.doOnNext(res -> latch.countDown())
.subscribe();
@@ -346,9 +346,9 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
Flux.just(1, 2, 3)
.flatMap(fun)
- .map(e -> e.getMessage())
+ .map(Exchange::getMessage)
.map(e -> e.getBody(String.class))
- .doOnNext(res -> values.add(res))
+ .doOnNext(values::add)
.doOnNext(res -> latch.countDown())
.subscribe();
@@ -380,7 +380,7 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
int idx = 1;
for (Exchange ex : mock.getExchanges()) {
- Assert.assertEquals(new Integer(idx++), ex.getIn().getBody(Integer.class));
+ Assert.assertEquals(Integer.valueOf(idx++), ex.getIn().getBody(Integer.class));
}
}
diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
index 6110ff4..1088c17 100644
--- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
+++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
@@ -38,7 +38,7 @@ final class RxJavaCamelProcessor implements Closeable {
private final String name;
private final RxJavaStreamsService service;
private final AtomicReference<FlowableEmitter<Exchange>> camelEmitter;
- private FlowableProcessor<Exchange> publisher;
+ private final FlowableProcessor<Exchange> publisher;
private ReactiveStreamsProducer camelProducer;
RxJavaCamelProcessor(RxJavaStreamsService service, String name) {
diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
index 75e8ec9..860a055 100644
--- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
+++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
@@ -64,6 +64,11 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive
return RxJavaStreamsConstants.SERVICE_NAME;
}
+ @Override
+ public CamelContext getCamelContext() {
+ return context;
+ }
+
// ******************************************
// Lifecycle
// ******************************************
diff --git a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java
index cc98a98..dd068f6 100644
--- a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java
+++ b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java
@@ -162,7 +162,9 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport {
CountDownLatch latch = new CountDownLatch(3);
Flowable.fromPublisher(timer).map(exchange -> ExchangeHelper.getHeaderOrProperty(exchange, Exchange.TIMER_COUNTER, Integer.class))
- .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())).doOnNext(res -> latch.countDown()).subscribe();
+ .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
+ .doOnNext(res -> latch.countDown())
+ .subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
}
@@ -255,7 +257,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport {
AtomicInteger value = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
- Flowable.just(1, 2, 3).flatMap(e -> crs.to("bean:hello", e)).map(e -> e.getMessage()).map(e -> e.getBody(String.class))
+ Flowable.just(1, 2, 3).flatMap(e -> crs.to("bean:hello", e)).map(Exchange::getMessage).map(e -> e.getBody(String.class))
.doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
@@ -282,7 +284,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport {
CountDownLatch latch = new CountDownLatch(1);
Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello");
- Flowable.just(1, 2, 3).flatMap(fun::apply).map(e -> e.getMessage()).map(e -> e.getBody(String.class))
+ Flowable.just(1, 2, 3).flatMap(fun::apply).map(Exchange::getMessage).map(e -> e.getBody(String.class))
.doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe();
Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
@@ -310,7 +312,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport {
int idx = 1;
for (Exchange ex : mock.getExchanges()) {
- Assert.assertEquals(new Integer(idx++), ex.getIn().getBody(Integer.class));
+ Assert.assertEquals(Integer.valueOf(idx++), ex.getIn().getBody(Integer.class));
}
}