You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/03 15:01:17 UTC

[camel] 11/11: CAMEL-14219: enforce type conversion on reactive-streams subscriber (#3362)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f05914da2c2ee99eb0929c32c786093e0db87558
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Wed Nov 27 05:01:02 2019 +0100

    CAMEL-14219: enforce type conversion on reactive-streams subscriber (#3362)
---
 .../streams/engine/DefaultCamelReactiveStreamsService.java        | 4 ++--
 .../component/reactive/streams/util/ConvertingSubscriber.java     | 8 ++++++--
 .../camel/component/reactor/engine/ReactorStreamsService.java     | 4 ++--
 .../camel/component/rxjava/engine/RxJavaStreamsService.java       | 4 ++--
 4 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
index 96ca0ed..19d50a6 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
@@ -139,7 +139,7 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement
             return (Subscriber<T>) streamSubscriber(name);
         }
 
-        return new ConvertingSubscriber<>(streamSubscriber(name), context);
+        return new ConvertingSubscriber<>(streamSubscriber(name), context, type);
     }
 
     @Override
@@ -249,7 +249,7 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement
 
     @Override
     public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
-        return new ConvertingSubscriber<>(subscriber(uri), context);
+        return new ConvertingSubscriber<>(subscriber(uri), context, type);
     }
 
     @Override
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
index 3e4b375..90e830a 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
@@ -29,14 +29,18 @@ import org.reactivestreams.Subscription;
  */
 public class ConvertingSubscriber<R> implements Subscriber<R> {
 
+    private Class<R> type;
+
     private Subscriber<Exchange> delegate;
 
     private CamelContext context;
 
-    public ConvertingSubscriber(Subscriber<Exchange> delegate, CamelContext context) {
+    public ConvertingSubscriber(Subscriber<Exchange> delegate, CamelContext context, Class<R> type) {
         Objects.requireNonNull(delegate, "delegate subscriber cannot be null");
+        Objects.requireNonNull(type, "type cannot be null");
         this.delegate = delegate;
         this.context = context;
+        this.type = type;
     }
 
     @Override
@@ -55,7 +59,7 @@ public class ConvertingSubscriber<R> implements Subscriber<R> {
         }
 
         Exchange exchange = new DefaultExchange(context);
-        exchange.getIn().setBody(r);
+        exchange.getIn().setBody(r, type);
         delegate.onNext(exchange);
     }
 
diff --git a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
index c1ec616..fee248d 100644
--- a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
+++ b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
@@ -115,7 +115,7 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv
             return Subscriber.class.cast(subscriber);
         }
 
-        return new ConvertingSubscriber<>(subscriber, context);
+        return new ConvertingSubscriber<>(subscriber, context, type);
     }
 
     @Override
@@ -190,7 +190,7 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv
 
     @Override
     public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
-        return new ConvertingSubscriber<>(subscriber(uri), context);
+        return new ConvertingSubscriber<>(subscriber(uri), context, type);
     }
 
     @Override
diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
index b367ec9..50e83a1 100644
--- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
+++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
@@ -115,7 +115,7 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive
             return Subscriber.class.cast(subscriber);
         }
 
-        return new ConvertingSubscriber<>(subscriber, context);
+        return new ConvertingSubscriber<>(subscriber, context, type);
     }
 
     @Override
@@ -185,7 +185,7 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive
 
     @Override
     public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
-        return new ConvertingSubscriber<>(subscriber(uri), context);
+        return new ConvertingSubscriber<>(subscriber(uri), context, type);
     }
 
     @Override