You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/07/11 16:25:07 UTC

[camel] branch master updated: camel-reactive-stream: cleanup

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new b8d6fae  camel-reactive-stream: cleanup
b8d6fae is described below

commit b8d6faeb09ffe6ba9f5c02d930833bfecc89bc0a
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Fri Jul 10 17:56:50 2020 +0200

    camel-reactive-stream: cleanup
---
 .../reactive/streams/ReactiveStreamsCamelSubscriber.java   |  4 ++--
 .../component/reactive/streams/ReactiveStreamsHelper.java  |  1 -
 .../reactive/streams/api/CamelReactiveStreamsService.java  |  3 ++-
 .../component/reactive/streams/engine/CamelPublisher.java  |  8 ++++----
 .../reactive/streams/engine/CamelSubscription.java         | 12 ++++++------
 .../streams/engine/DefaultCamelReactiveStreamsService.java |  5 +++++
 .../reactive/streams/engine/DelayedMonoPublisher.java      | 12 ++++++------
 .../reactive/streams/engine/UnwrappingPublisher.java       |  2 +-
 .../component/reactive/streams/util/BodyConverter.java     | 11 ++---------
 .../reactive/streams/util/ConvertingPublisher.java         |  6 +++---
 .../reactive/streams/util/ConvertingSubscriber.java        |  6 +++---
 .../component/reactive/streams/util/MonoPublisher.java     |  2 +-
 .../streams/support/ReactiveStreamsTestService.java        |  6 ++++++
 .../component/reactor/engine/ReactorStreamsService.java    |  5 +++++
 .../engine/ReactorStreamsServiceBackpressureTest.java      |  4 ++--
 .../reactor/engine/ReactorStreamsServiceTest.java          | 14 +++++++-------
 .../component/rxjava/engine/RxJavaCamelProcessor.java      |  2 +-
 .../component/rxjava/engine/RxJavaStreamsService.java      |  5 +++++
 .../component/rxjava/engine/RxJavaStreamsServiceTest.java  | 10 ++++++----
 19 files changed, 67 insertions(+), 51 deletions(-)

diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
index 0d0ffe7..875df93 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java
@@ -37,12 +37,12 @@ public class ReactiveStreamsCamelSubscriber implements Subscriber<Exchange>, Clo
      */
     private static final long UNBOUNDED_REQUESTS = Long.MAX_VALUE;
 
+    private final String name;
+
     private ReactiveStreamsConsumer consumer;
 
     private Subscription subscription;
 
-    private String name;
-
     private long requested;
 
     private long inflightCount;
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java
index a380934..a9752c4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java
@@ -110,7 +110,6 @@ public final class ReactiveStreamsHelper {
         }
     }
 
-    @SuppressWarnings("unchecked")
     public static CamelReactiveStreamsServiceFactory resolveServiceFactory(CamelContext context, String serviceType) {
         try {
             FactoryFinder finder = context.adapt(ExtendedCamelContext.class).getFactoryFinder(ReactiveStreamsConstants.SERVICE_PATH);
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index baea25f..c0d31f0 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -23,6 +23,7 @@ import org.apache.camel.Service;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
+import org.apache.camel.spi.HasCamelContext;
 import org.apache.camel.spi.HasId;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
@@ -30,7 +31,7 @@ import org.reactivestreams.Subscriber;
 /**
  * The interface to which any implementation of the reactive-streams engine should comply.
  */
-public interface CamelReactiveStreamsService extends Service, HasId {
+public interface CamelReactiveStreamsService extends Service, HasId, HasCamelContext {
 
     /*
      * Main API methods.
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index 445cdd7..28327cf 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -45,13 +45,13 @@ public class CamelPublisher implements Publisher<Exchange>, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelPublisher.class);
 
-    private ExecutorService workerPool;
+    private final ExecutorService workerPool;
 
-    private String name;
+    private final String name;
 
-    private ReactiveStreamsBackpressureStrategy backpressureStrategy;
+    private final List<CamelSubscription> subscriptions = new CopyOnWriteArrayList<>();
 
-    private List<CamelSubscription> subscriptions = new CopyOnWriteArrayList<>();
+    private ReactiveStreamsBackpressureStrategy backpressureStrategy;
 
     private ReactiveStreamsProducer producer;
 
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index ff60d4f..3b9f0e9 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -43,17 +43,17 @@ public class CamelSubscription implements Subscription {
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelSubscription.class);
 
-    private String id;
+    private final String id;
 
-    private ExecutorService workerPool;
+    private final ExecutorService workerPool;
 
-    private String streamName;
+    private final String streamName;
 
-    private CamelPublisher publisher;
+    private final CamelPublisher publisher;
 
-    private ReactiveStreamsBackpressureStrategy backpressureStrategy;
+    private final Subscriber<? super Exchange> subscriber;
 
-    private Subscriber<? super Exchange> subscriber;
+    private ReactiveStreamsBackpressureStrategy backpressureStrategy;
 
     /**
      * The lock is used just for the time necessary to read/write shared variables.
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
index b34cd76..a9238e4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
@@ -88,6 +88,11 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement
     }
 
     @Override
+    public CamelContext getCamelContext() {
+        return context;
+    }
+
+    @Override
     protected void doInit() {
         if (this.workerPool == null) {
             this.workerPool = context.getExecutorServiceManager().newThreadPool(
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
index 74a937c..b929166 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
@@ -36,15 +36,15 @@ public class DelayedMonoPublisher<T> implements Publisher<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(DelayedMonoPublisher.class);
 
-    private ExecutorService workerPool;
+    private final ExecutorService workerPool;
 
-    private volatile T data;
+    private final List<MonoSubscription> subscriptions = new CopyOnWriteArrayList<>();
 
-    private volatile Throwable exception;
+    private final AtomicBoolean flushing = new AtomicBoolean(false);
 
-    private List<MonoSubscription> subscriptions = new CopyOnWriteArrayList<>();
+    private volatile T data;
 
-    private AtomicBoolean flushing = new AtomicBoolean(false);
+    private volatile Throwable exception;
 
     public DelayedMonoPublisher(ExecutorService workerPool) {
         this.workerPool = workerPool;
@@ -130,7 +130,7 @@ public class DelayedMonoPublisher<T> implements Publisher<T> {
 
         private volatile boolean requested;
 
-        private Subscriber<? super T> subscriber;
+        private final Subscriber<? super T> subscriber;
 
         private MonoSubscription(Subscriber<? super T> subscriber) {
             this.subscriber = subscriber;
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
index 30dac80..6910c90 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
@@ -30,7 +30,7 @@ import org.reactivestreams.Subscription;
  * It calls the dispatch callback if defined.
  */
 public class UnwrappingPublisher implements Publisher<Exchange> {
-    private Publisher<Exchange> delegate;
+    private final Publisher<Exchange> delegate;
 
     public UnwrappingPublisher(Publisher<Exchange> delegate) {
         Objects.requireNonNull(delegate, "delegate publisher cannot be null");
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java
index 2f11562..5dc7bd8 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java
@@ -33,17 +33,10 @@ public final class BodyConverter<T> implements Function<Exchange, T> {
 
     @Override
     public T apply(Exchange exchange) {
-        T answer;
-
-        if (exchange.hasOut()) {
-            answer = exchange.getOut().getBody(type);
-        } else {
-            answer = exchange.getIn().getBody(type);
-        }
-
-        return answer;
+        return exchange.getMessage().getBody(type);
     }
 
+    @SuppressWarnings("unchecked")
     public static <C> BodyConverter<C> forType(Class<C> type) {
         return BodyConverter.class.cast(
             CACHE.computeIfAbsent(type, BodyConverter::new)
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
index 1e94b13..154b08a 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
@@ -34,10 +34,10 @@ public class ConvertingPublisher<R> implements Publisher<R> {
 
     private static final Logger LOG = LoggerFactory.getLogger(ConvertingPublisher.class);
 
-    private Publisher<Exchange> delegate;
+    private final Publisher<Exchange> delegate;
 
-    private Class<R> type;
-    private BodyConverter<R> converter;
+    private final Class<R> type;
+    private final BodyConverter<R> converter;
 
     public ConvertingPublisher(Publisher<Exchange> delegate, Class<R> type) {
         Objects.requireNonNull(delegate, "delegate publisher cannot be null");
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
index 90e830a..bd25706 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
@@ -29,11 +29,11 @@ import org.reactivestreams.Subscription;
  */
 public class ConvertingSubscriber<R> implements Subscriber<R> {
 
-    private Class<R> type;
+    private final Class<R> type;
 
-    private Subscriber<Exchange> delegate;
+    private final Subscriber<Exchange> delegate;
 
-    private CamelContext context;
+    private final CamelContext context;
 
     public ConvertingSubscriber(Subscriber<Exchange> delegate, CamelContext context, Class<R> type) {
         Objects.requireNonNull(delegate, "delegate subscriber cannot be null");
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
index 4775232..3040889 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
@@ -27,7 +27,7 @@ import org.reactivestreams.Subscription;
  */
 public class MonoPublisher<T> implements Publisher<T> {
 
-    private T item;
+    private final T item;
 
     public MonoPublisher(T item) {
         this.item = item;
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 65e54e2..eaf72e4 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.reactive.streams.support;
 
 import java.util.function.Function;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
@@ -38,6 +39,11 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
     }
 
     @Override
+    public CamelContext getCamelContext() {
+        return null;
+    }
+
+    @Override
     public void start() {
 
     }
diff --git a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
index 7640274..4c76e9c 100644
--- a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
+++ b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
@@ -64,6 +64,11 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv
         return ReactorStreamsConstants.SERVICE_NAME;
     }
 
+    @Override
+    public CamelContext getCamelContext() {
+        return context;
+    }
+
     // ******************************************
     // Lifecycle
     // ******************************************
diff --git a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
index f0b9c2b..ad0e128 100644
--- a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
+++ b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
@@ -108,7 +108,7 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService
         Thread.sleep(200); // add other time to ensure no other items arrive
         Assert.assertEquals(2, queue.size());
 
-        int sum = queue.stream().reduce((i, j) -> i + j).get();
+        int sum = queue.stream().reduce(Integer::sum).get();
         Assert.assertEquals(3, sum); // 1 + 2 = 3
 
         subscriber.cancel();
@@ -158,7 +158,7 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService
         // Assert.assertEquals(2, queue.size());
         Assert.assertEquals(3, queue.size());
 
-        int sum = queue.stream().reduce((i, j) -> i + j).get();
+        int sum = queue.stream().reduce(Integer::sum).get();
         // Assert.assertEquals(21, sum); // 1 + 20 = 21
         Assert.assertEquals(23, sum); // 1 + 2 + 20 = 23
 
diff --git a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
index 690c578..6ef2a2f 100644
--- a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
+++ b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java
@@ -291,7 +291,7 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
 
         Flux.just(1, 2, 3)
             .flatMap(e -> crs.to("bean:hello", e, String.class))
-            .doOnNext(res -> values.add(res))
+            .doOnNext(values::add)
             .doOnNext(res -> latch.countDown())
             .subscribe();
 
@@ -308,9 +308,9 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
 
         Flux.just(1, 2, 3)
             .flatMap(e -> crs.to("bean:hello", e))
-            .map(e -> e.getMessage())
+            .map(Exchange::getMessage)
             .map(e -> e.getBody(String.class))
-            .doOnNext(res -> values.add(res))
+            .doOnNext(values::add)
             .doOnNext(res -> latch.countDown())
             .subscribe();
 
@@ -328,7 +328,7 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
 
         Flux.just(1, 2, 3)
             .flatMap(fun)
-            .doOnNext(res -> values.add(res))
+            .doOnNext(values::add)
             .doOnNext(res -> latch.countDown())
             .subscribe();
 
@@ -346,9 +346,9 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
 
         Flux.just(1, 2, 3)
             .flatMap(fun)
-            .map(e -> e.getMessage())
+            .map(Exchange::getMessage)
             .map(e -> e.getBody(String.class))
-            .doOnNext(res -> values.add(res))
+            .doOnNext(values::add)
             .doOnNext(res -> latch.countDown())
             .subscribe();
 
@@ -380,7 +380,7 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport
 
         int idx = 1;
         for (Exchange ex : mock.getExchanges()) {
-            Assert.assertEquals(new Integer(idx++), ex.getIn().getBody(Integer.class));
+            Assert.assertEquals(Integer.valueOf(idx++), ex.getIn().getBody(Integer.class));
         }
     }
 
diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
index 6110ff4..1088c17 100644
--- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
+++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java
@@ -38,7 +38,7 @@ final class RxJavaCamelProcessor implements Closeable {
     private final String name;
     private final RxJavaStreamsService service;
     private final AtomicReference<FlowableEmitter<Exchange>> camelEmitter;
-    private FlowableProcessor<Exchange> publisher;
+    private final FlowableProcessor<Exchange> publisher;
     private ReactiveStreamsProducer camelProducer;
 
     RxJavaCamelProcessor(RxJavaStreamsService service, String name) {
diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
index 75e8ec9..860a055 100644
--- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
+++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
@@ -64,6 +64,11 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive
         return RxJavaStreamsConstants.SERVICE_NAME;
     }
 
+    @Override
+    public CamelContext getCamelContext() {
+        return context;
+    }
+
     // ******************************************
     // Lifecycle
     // ******************************************
diff --git a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java
index cc98a98..dd068f6 100644
--- a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java
+++ b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java
@@ -162,7 +162,9 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport {
         CountDownLatch latch = new CountDownLatch(3);
 
         Flowable.fromPublisher(timer).map(exchange -> ExchangeHelper.getHeaderOrProperty(exchange, Exchange.TIMER_COUNTER, Integer.class))
-            .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())).doOnNext(res -> latch.countDown()).subscribe();
+            .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue()))
+            .doOnNext(res -> latch.countDown())
+            .subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
     }
@@ -255,7 +257,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport {
         AtomicInteger value = new AtomicInteger(0);
         CountDownLatch latch = new CountDownLatch(1);
 
-        Flowable.just(1, 2, 3).flatMap(e -> crs.to("bean:hello", e)).map(e -> e.getMessage()).map(e -> e.getBody(String.class))
+        Flowable.just(1, 2, 3).flatMap(e -> crs.to("bean:hello", e)).map(Exchange::getMessage).map(e -> e.getBody(String.class))
             .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
@@ -282,7 +284,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport {
         CountDownLatch latch = new CountDownLatch(1);
         Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello");
 
-        Flowable.just(1, 2, 3).flatMap(fun::apply).map(e -> e.getMessage()).map(e -> e.getBody(String.class))
+        Flowable.just(1, 2, 3).flatMap(fun::apply).map(Exchange::getMessage).map(e -> e.getBody(String.class))
             .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
@@ -310,7 +312,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport {
 
         int idx = 1;
         for (Exchange ex : mock.getExchanges()) {
-            Assert.assertEquals(new Integer(idx++), ex.getIn().getBody(Integer.class));
+            Assert.assertEquals(Integer.valueOf(idx++), ex.getIn().getBody(Integer.class));
         }
     }