You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/03 15:01:17 UTC
[camel] 11/11: CAMEL-14219: enforce type conversion on
reactive-streams subscriber (#3362)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit f05914da2c2ee99eb0929c32c786093e0db87558
Author: Nicola Ferraro <ni...@gmail.com>
AuthorDate: Wed Nov 27 05:01:02 2019 +0100
CAMEL-14219: enforce type conversion on reactive-streams subscriber (#3362)
---
.../streams/engine/DefaultCamelReactiveStreamsService.java | 4 ++--
.../component/reactive/streams/util/ConvertingSubscriber.java | 8 ++++++--
.../camel/component/reactor/engine/ReactorStreamsService.java | 4 ++--
.../camel/component/rxjava/engine/RxJavaStreamsService.java | 4 ++--
4 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
index 96ca0ed..19d50a6 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
@@ -139,7 +139,7 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement
return (Subscriber<T>) streamSubscriber(name);
}
- return new ConvertingSubscriber<>(streamSubscriber(name), context);
+ return new ConvertingSubscriber<>(streamSubscriber(name), context, type);
}
@Override
@@ -249,7 +249,7 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement
@Override
public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
- return new ConvertingSubscriber<>(subscriber(uri), context);
+ return new ConvertingSubscriber<>(subscriber(uri), context, type);
}
@Override
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
index 3e4b375..90e830a 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java
@@ -29,14 +29,18 @@ import org.reactivestreams.Subscription;
*/
public class ConvertingSubscriber<R> implements Subscriber<R> {
+ private Class<R> type;
+
private Subscriber<Exchange> delegate;
private CamelContext context;
- public ConvertingSubscriber(Subscriber<Exchange> delegate, CamelContext context) {
+ public ConvertingSubscriber(Subscriber<Exchange> delegate, CamelContext context, Class<R> type) {
Objects.requireNonNull(delegate, "delegate subscriber cannot be null");
+ Objects.requireNonNull(type, "type cannot be null");
this.delegate = delegate;
this.context = context;
+ this.type = type;
}
@Override
@@ -55,7 +59,7 @@ public class ConvertingSubscriber<R> implements Subscriber<R> {
}
Exchange exchange = new DefaultExchange(context);
- exchange.getIn().setBody(r);
+ exchange.getIn().setBody(r, type);
delegate.onNext(exchange);
}
diff --git a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
index c1ec616..fee248d 100644
--- a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
+++ b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
@@ -115,7 +115,7 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv
return Subscriber.class.cast(subscriber);
}
- return new ConvertingSubscriber<>(subscriber, context);
+ return new ConvertingSubscriber<>(subscriber, context, type);
}
@Override
@@ -190,7 +190,7 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv
@Override
public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
- return new ConvertingSubscriber<>(subscriber(uri), context);
+ return new ConvertingSubscriber<>(subscriber(uri), context, type);
}
@Override
diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
index b367ec9..50e83a1 100644
--- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
+++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
@@ -115,7 +115,7 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive
return Subscriber.class.cast(subscriber);
}
- return new ConvertingSubscriber<>(subscriber, context);
+ return new ConvertingSubscriber<>(subscriber, context, type);
}
@Override
@@ -185,7 +185,7 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive
@Override
public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
- return new ConvertingSubscriber<>(subscriber(uri), context);
+ return new ConvertingSubscriber<>(subscriber(uri), context, type);
}
@Override