You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/02/02 16:06:41 UTC
[1/2] camel git commit: CAMEL-10612: added more examples and improved
interface
Repository: camel
Updated Branches:
refs/heads/master 1885db212 -> b4f75905b
CAMEL-10612: added more examples and improved interface
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b4f75905
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b4f75905
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b4f75905
Branch: refs/heads/master
Commit: b4f75905b9f232a96846f7d2e22fe8b4fbe9fdb9
Parents: 5108cb5
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Thu Feb 2 17:06:02 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Thu Feb 2 17:06:20 2017 +0100
----------------------------------------------------------------------
.../streams/ReactiveStreamsConverter.java | 8 -
.../api/CamelReactiveStreamsService.java | 30 +++-
.../engine/CamelReactiveStreamsServiceImpl.java | 11 ++
.../streams/util/UnwrapStreamProcessor.java | 90 +++++++++++
.../streams/util/UnwrappingStreamProcessor.java | 78 ---------
.../reactive/streams/BeanCallTest.java | 11 +-
.../support/ReactiveStreamsTestService.java | 12 ++
examples/camel-example-reactive-streams/pom.xml | 4 +
.../streams/BasicCamelToReactorExample.java | 98 ++++++++++++
.../BasicCamelToReactorInOutExample.java | 158 +++++++++++++++++++
.../streams/BasicReactorToCamelExample.java | 88 +++++++++++
.../BasicReactorToCamelInOutExample.java | 90 +++++++++++
.../streams/ReactiveStreamsSpringBootApp.java | 31 ----
.../example/reactive/streams/RestExample.java | 82 ++++++++++
.../reactive/streams/SampleCamelRoutes.java | 40 -----
.../reactive/streams/SampleReactiveStreams.java | 52 ------
.../app/ReactiveStreamsSpringBootApp.java | 31 ++++
.../main/resources/META-INF/spring.factories | 24 +++
.../src/main/resources/application.yml | 12 ++
19 files changed, 735 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
index 23f49c9..1fd61ae 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
@@ -16,21 +16,13 @@
*/
package org.apache.camel.component.reactive.streams;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Converter;
import org.apache.camel.Exchange;
import org.apache.camel.FallbackConverter;
-import org.apache.camel.Processor;
-import org.apache.camel.component.bean.BeanInfo;
-import org.apache.camel.component.bean.BeanProcessor;
-import org.apache.camel.component.bean.ConstantBeanHolder;
import org.apache.camel.component.reactive.streams.util.MonoPublisher;
-import org.apache.camel.component.reactive.streams.util.UnwrappingStreamProcessor;
import org.apache.camel.spi.TypeConverterRegistry;
-import org.apache.camel.util.AsyncProcessorHelper;
import org.reactivestreams.Publisher;
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index 3a7acd4..03ca6a0 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.reactive.streams.api;
+import java.util.function.Function;
+
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Service;
@@ -83,11 +85,22 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
*
* @param name the stream name
* @param data the data to push
- * @return an publisher with the resulting exchange
+ * @return a publisher with the resulting exchange
*/
Publisher<Exchange> request(String name, Object data);
/**
+ * Returns a function that pushes data into the specified Camel stream and
+ * returns a Publisher (mono) holding the resulting exchange or an error.
+ *
+ * This is a curryied version of {@link CamelReactiveStreamsService#request(String, Object)}.
+ *
+ * @param name the stream name
+ * @return a function that returns a publisher with the resulting exchange
+ */
+ Function<?, ? extends Publisher<Exchange>> request(String name);
+
+ /**
* Pushes the given data into the specified Camel stream and returns a Publisher (mono) holding
* the exchange output or an error.
*
@@ -95,10 +108,23 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
* @param data the data to push
* @param type the type to which the output should be converted
* @param <T> the generic type of the resulting Publisher
- * @return an publisher with the resulting data
+ * @return a publisher with the resulting data
*/
<T> Publisher<T> request(String name, Object data, Class<T> type);
+ /**
+ * Returns a function that pushes data into the specified Camel stream and
+ * returns a Publisher (mono) holding the exchange output or an error.
+ *
+ * This is a curryied version of {@link CamelReactiveStreamsService#request(String, Object, Class)}.
+ *
+ * @param name the stream name
+ * @param type the type to which the output should be converted
+ * @param <T> the generic type of the resulting Publisher
+ * @return a function that returns a publisher with the resulting data
+ */
+ <T> Function<Object, Publisher<T>> request(String name, Class<T> type);
+
/*
* Methods for Camel producers.
*/
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index edffb9e..4b67ce0 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.reactive.streams.engine;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -120,6 +121,11 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
}
@Override
+ public Function<?, ? extends Publisher<Exchange>> request(String name) {
+ return data -> request(name, data);
+ }
+
+ @Override
public <T> Publisher<T> request(String name, Object data, Class<T> type) {
return new ConvertingPublisher<>(request(name, data), type);
}
@@ -154,6 +160,11 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
return publisher;
}
+ @Override
+ public <T> Function<Object, Publisher<T>> request(String name, Class<T> type) {
+ return data -> request(name, data, type);
+ }
+
private CamelPublisher getPayloadPublisher(String name) {
synchronized (this) {
if (!publishers.containsKey(name)) {
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
new file mode 100644
index 0000000..6800ce0
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.util;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * A Processor that converts a Publisher into its content asynchronously.
+ */
+public class UnwrapStreamProcessor implements AsyncProcessor {
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ Object content = exchange.getIn().getBody();
+ if (content instanceof Publisher) {
+ Publisher<?> pub = Publisher.class.cast(content);
+
+ List<Object> data = new LinkedList<>();
+
+ pub.subscribe(new Subscriber<Object>() {
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(Object o) {
+ data.add(o);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ addData();
+ exchange.setException(throwable);
+ callback.done(false);
+ }
+
+ @Override
+ public void onComplete() {
+ addData();
+ callback.done(false);
+ }
+
+ private void addData() {
+ Object body;
+ if (data.size() == 0) {
+ body = null;
+ } else if (data.size() == 1) {
+ body = data.get(0);
+ } else {
+ body = data;
+ }
+ exchange.getIn().setBody(body);
+ }
+
+ });
+ }
+ return false;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
deleted file mode 100644
index fa85fd2..0000000
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.reactive.streams.util;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Exchange;
-import org.apache.camel.util.AsyncProcessorHelper;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-/**
- * A Processor that converts a Publisher into its content asynchronously.
- */
-public class UnwrappingStreamProcessor implements AsyncProcessor {
-
- @Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
- Object content = exchange.getIn().getBody();
- if (content instanceof Publisher) {
- Publisher<?> pub = Publisher.class.cast(content);
-
- List<Object> data = new LinkedList<>();
-
- pub.subscribe(new Subscriber<Object>() {
-
- @Override
- public void onSubscribe(Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
- }
-
- @Override
- public void onNext(Object o) {
- data.add(o);
- }
-
- @Override
- public void onError(Throwable throwable) {
- exchange.getIn().setBody(data);
- exchange.setException(throwable);
- callback.done(false);
- }
-
- @Override
- public void onComplete() {
- exchange.getIn().setBody(data);
- callback.done(false);
- }
-
- });
- }
- return false;
- }
-
- @Override
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
index 1f05a00..5a532ae 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
@@ -22,7 +22,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Header;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.reactive.streams.util.UnwrappingStreamProcessor;
+import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.reactivestreams.Publisher;
@@ -42,7 +42,8 @@ public class BeanCallTest extends CamelTestSupport {
from("direct:num")
.bean(BeanCallTest.this, "processBody")
- .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+ .process(new UnwrapStreamProcessor()) // Can be removed?
+ .split().body()
.to("mock:endpoint");
from("direct:handle")
@@ -74,7 +75,8 @@ public class BeanCallTest extends CamelTestSupport {
from("direct:num")
.bean(BeanCallTest.this, "processBodyWrongType")
- .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+ .process(new UnwrapStreamProcessor()) // Can be removed?
+ .split().body()
.to("mock:endpoint");
from("direct:handle")
@@ -105,7 +107,8 @@ public class BeanCallTest extends CamelTestSupport {
from("direct:num")
.bean(BeanCallTest.this, "processHeader")
- .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+ .process(new UnwrapStreamProcessor()) // Can be removed?
+ .split().body()
.to("mock:endpoint");
from("direct:handle")
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 5288263..6ab9c5e 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.reactive.streams.support;
+import java.util.function.Function;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
@@ -114,6 +116,16 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
return null;
}
+ @Override
+ public Function<?, ? extends Publisher<Exchange>> request(String name) {
+ return null;
+ }
+
+ @Override
+ public <T> Function<Object, Publisher<T>> request(String name, Class<T> type) {
+ return null;
+ }
+
public String getName() {
return name;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/pom.xml
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/pom.xml b/examples/camel-example-reactive-streams/pom.xml
index abbbe20..10c1a23 100644
--- a/examples/camel-example-reactive-streams/pom.xml
+++ b/examples/camel-example-reactive-streams/pom.xml
@@ -47,6 +47,10 @@
<artifactId>camel-reactive-streams-starter</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-servlet-starter</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java
new file mode 100644
index 0000000..1929255
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorExample.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * This example shows how a reactive stream framework can subscribe to events published
+ * by Camel routes.
+ *
+ * The exchange pattern is in-only from Camel to Reactor.
+ *
+ * Note: the Camel and reactor components are placed in the same configuration class for the sake of clarity,
+ * but they can be moved in their own files.
+ */
+@Configuration
+@ConditionalOnProperty("examples.basic.camel-to-reactor")
+public class BasicCamelToReactorExample {
+
+ /**
+ * The reactor streams.
+ */
+ @Component
+ public static class BasicCamelToReactorExampleStreams {
+ private static final Logger LOG = LoggerFactory.getLogger(BasicCamelToReactorExample.class);
+
+ @Autowired
+ private CamelReactiveStreamsService camel;
+
+
+ @PostConstruct
+ public void setupStreams() {
+
+ // Use two streams from Camel
+ Publisher<Integer> numbers = camel.getPublisher("numbers", Integer.class);
+ Publisher<String> strings = camel.getPublisher("strings", String.class);
+
+ Flux.from(numbers)
+ .zipWith(strings) // emit items in pairs
+ .map(tuple -> "BasicCamelToReactor - " + tuple.getT1() + " -> " + tuple.getT2())
+ .doOnNext(LOG::info)
+ .subscribe();
+ }
+
+ }
+
+
+ /**
+ * The Camel Configuration.
+ */
+ @Component
+ public static class BasicCamelToReactorExampleRoutes extends RouteBuilder {
+
+ @Override
+ public void configure() throws Exception {
+
+ // Generating numbers every 5 seconds and forwarding to the stream "numbers"
+ from("timer:clock?period=5000")
+ .setBody().header(Exchange.TIMER_COUNTER)
+ .to("reactive-streams:numbers");
+
+ // Generating strings every 4.9 seconds and forwarding to the stream "strings"
+ from("timer:clock2?period=4900&delay=2000")
+ .setBody().simple("Hello World ${header.CamelTimerCounter}!")
+ .to("reactive-streams:strings");
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java
new file mode 100644
index 0000000..1d0024c
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicCamelToReactorInOutExample.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
+import org.reactivestreams.Publisher;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * This example shows how Camel can asynchronously request data to a reactive stream framework
+ * and continue processing.
+ *
+ * The exchange pattern is in-out from Camel to Reactor.
+ *
+ * Note: the Camel and reactor components are placed in the same configuration class for the sake of clarity,
+ * but they can be moved in their own files.
+ */
+@Configuration
+@ConditionalOnProperty("examples.basic.camel-to-reactor-in-out")
+public class BasicCamelToReactorInOutExample {
+
+ /**
+ * The reactor streams.
+ */
+ @Component("userBean")
+ public static class BasicCamelToReactorInOutExampleStreams {
+
+ /**
+ * This method will be called by a Camel route.
+ */
+ public Publisher<UserInfo> getUserInfo(Publisher<Long> userId) {
+ return Flux.from(userId)
+ .map(UserInfo::new)
+ .flatMap(this::retrieveAddress)
+ .flatMap(this::retrieveName);
+ }
+
+ /**
+ * This is a sample utility method.
+ */
+ private Publisher<UserInfo> retrieveAddress(UserInfo user) {
+ // you can do an async database retrieval here
+ return Flux.just(user.withAddress("Address" + user.getId()));
+ }
+
+ private Publisher<UserInfo> retrieveName(UserInfo user) {
+ // you can do an async database retrieval here
+ return Flux.just(user.withName("Name" + user.getId()));
+ }
+
+ }
+
+
+ /**
+ * The Camel Configuration.
+ */
+ @Component
+ public static class BasicCamelToReactorInOutExampleRoutes extends RouteBuilder {
+
+ @Override
+ public void configure() throws Exception {
+
+ // Generate a Id and retrieve user data from reactor
+ from("timer:clock?period=9000&delay=1500")
+ .setBody().header(Exchange.TIMER_COUNTER).convertBodyTo(Long.class) // Sample ID
+ .bean("userBean", "getUserInfo") // Get the user info from reactor code
+ .process(new UnwrapStreamProcessor()).split().body() // Unwrap the Publisher
+ .log("BasicCamelToReactorInOut - Got ${body}");
+
+ }
+
+ }
+
+
+ /**
+ * Sample bean used in the example.
+ */
+ public static class UserInfo {
+
+ private Long id;
+
+ private String name;
+
+ private String address;
+
+ public UserInfo() {
+ }
+
+ public UserInfo(Long id) {
+ this.id = id;
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public UserInfo withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public UserInfo withAddress(String address) {
+ this.address = address;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("UserInfo{");
+ sb.append("id=").append(id);
+ sb.append(", name='").append(name).append('\'');
+ sb.append(", address='").append(address).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java
new file mode 100644
index 0000000..5a71eb5
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelExample.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams;
+
+import java.time.Duration;
+import javax.annotation.PostConstruct;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.reactivestreams.Subscriber;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * This example shows how a reactive stream framework can publish events that are consumed
+ * by Camel routes.
+ *
+ * The exchange pattern is in-only from Reactor to Camel.
+ *
+ * Note: the Camel and reactor components are placed in the same configuration class for the sake of clarity,
+ * but they can be moved in their own files.
+ */
+@Configuration
+@ConditionalOnProperty("examples.basic.reactor-to-camel")
+public class BasicReactorToCamelExample {
+
+ /**
+ * The reactor streams.
+ */
+ @Component
+ public static class BasicReactorToCamelExampleStreams {
+
+ @Autowired
+ private CamelReactiveStreamsService camel;
+
+
+ @PostConstruct
+ public void setupStreams() {
+
+ // Get a subscriber from camel
+ Subscriber<String> elements = camel.getSubscriber("elements", String.class);
+
+ // Emit a string every 7 seconds and push it to the Camel "elements" stream
+ Flux.interval(Duration.ofSeconds(7))
+ .map(item -> "element " + item)
+ .subscribe(elements);
+
+ }
+ }
+
+
+ /**
+ * The Camel Configuration.
+ */
+ @Component
+ public static class BasicReactorToCamelExampleRoutes extends RouteBuilder {
+
+ @Override
+ public void configure() throws Exception {
+
+ // Transform the body of received items and log
+ from("reactive-streams:elements")
+ .setBody().simple("BasicReactorToCamel - Camel received ${body}")
+ .to("log:INFO");
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java
new file mode 100644
index 0000000..36bc59e
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/BasicReactorToCamelInOutExample.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams;
+
+import java.time.Duration;
+import javax.annotation.PostConstruct;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * This example shows how a reactive stream framework can asynchronously request data to a Camel stream
+ * and continue processing.
+ *
+ * The exchange pattern is in-out from Reactor to Camel.
+ *
+ * Note: the Camel and reactor components are placed in the same configuration class for the sake of clarity,
+ * but they can be moved in their own files.
+ */
+@Configuration
+@ConditionalOnProperty("examples.basic.reactor-to-camel-in-out")
+public class BasicReactorToCamelInOutExample {
+
+ /**
+ * The reactor streams.
+ */
+ @Component
+ public static class BasicReactorToCamelInOutExampleStreams {
+ private static final Logger LOG = LoggerFactory.getLogger(BasicReactorToCamelInOutExample.class);
+
+ @Autowired
+ private CamelReactiveStreamsService camel;
+
+
+ @PostConstruct
+ public void setupStreams() {
+
+ Flux.interval(Duration.ofSeconds(8))
+ .map(i -> i + 1) // to start from 1
+ .flatMap(camel.request("sqrt", Double.class)) // call Camel and continue
+ .map(d -> "BasicReactorToCamelInOut - sqrt=" + d)
+ .doOnNext(LOG::info)
+ .subscribe();
+
+ }
+ }
+
+
+ /**
+ * The Camel Configuration.
+ */
+ @Component
+ public static class BasicReactorToCamelInOutExampleRoutes extends RouteBuilder {
+
+ @Override
+ public void configure() throws Exception {
+
+ // Transform the body of every exchange into its square root
+ from("reactive-streams:sqrt")
+ .setBody().body(Double.class, Math::sqrt);
+
+ // This route can be much more complex: it can use any Camel component to compute the output data
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ReactiveStreamsSpringBootApp.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ReactiveStreamsSpringBootApp.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ReactiveStreamsSpringBootApp.java
deleted file mode 100644
index 6440d6b..0000000
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/ReactiveStreamsSpringBootApp.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.example.reactive.streams;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-//CHECKSTYLE:OFF
-@SpringBootApplication
-public class ReactiveStreamsSpringBootApp {
-
- public static void main(String[] args) {
- SpringApplication.run(ReactiveStreamsSpringBootApp.class, args);
- }
-
-}
-//CHECKSTYLE:ON
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java
new file mode 100644
index 0000000..4a650d2
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/RestExample.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams;
+
+import org.apache.camel.Header;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
+import org.reactivestreams.Publisher;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * This example shows how a Camel route defined with rest DSL can call a can asynchronously request data to a reactive stream framework
+ * and continue processing.
+ *
+ * The exchange pattern is in-out from Camel to Reactor.
+ *
+ * Note: the Camel and reactor components are placed in the same configuration class for the sake of clarity,
+ * but they can be moved in their own files.
+ */
+@Configuration
+@ConditionalOnProperty("examples.others.rest-example")
+public class RestExample {
+
+ /**
+ * The reactor streams.
+ */
+ @Component("calculator")
+ public static class RestExampleStreams {
+
+ /**
+ * This method will be called by a Camel route.
+ */
+ public Publisher<Long> sum(@Header("num1") Publisher<Long> num1, @Header("num2") Publisher<Long> num2) {
+ return Flux.from(num1).zipWith(num2)
+ .map(t -> t.getT1() + t.getT2());
+ }
+
+ }
+
+
+ /**
+ * The Camel Configuration.
+ */
+ @Component
+ public static class RestExampleRoutes extends RouteBuilder {
+
+ @Override
+ public void configure() throws Exception {
+
+ // The full path should be eg.: http://localhost:8080/camel/sum/23/31
+ rest().get("/sum/{num1}/{num2}")
+ .produces("text/plain")
+ .route()
+ .setHeader("num1").simple("headerAs(num1,Long)")
+ .setHeader("num2").simple("headerAs(num2,Long)")
+ .bean("calculator", "sum")
+ .process(new UnwrapStreamProcessor())
+ .setBody().simple("The result is: ${body}");
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleCamelRoutes.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleCamelRoutes.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleCamelRoutes.java
deleted file mode 100644
index cc5b59b..0000000
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleCamelRoutes.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.example.reactive.streams;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.springframework.stereotype.Component;
-
-@Component
-public class SampleCamelRoutes extends RouteBuilder {
-
- @Override
- public void configure() throws Exception {
-
- // Generating numbers every 3 seconds and forwarding to the stream "numbers"
- from("timer:clock?period=3000")
- .setBody().header(Exchange.TIMER_COUNTER)
- .to("reactive-streams:numbers");
-
- // Generating strings every 2.9 seconds and forwarding to the stream "strings"
- from("timer:clock2?period=2900&delay=1000")
- .setBody().simple("Hello World ${header.CamelTimerCounter}!")
- .to("reactive-streams:strings");
-
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleReactiveStreams.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleReactiveStreams.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleReactiveStreams.java
deleted file mode 100644
index 271e209..0000000
--- a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/SampleReactiveStreams.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.example.reactive.streams;
-
-import javax.annotation.PostConstruct;
-
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
-import org.reactivestreams.Publisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import reactor.core.publisher.Flux;
-
-@Component
-public class SampleReactiveStreams {
-
- private static final Logger LOG = LoggerFactory.getLogger(SampleReactiveStreams.class);
-
- @Autowired
- private CamelReactiveStreamsService camelStreams;
-
- @PostConstruct
- public void configure() {
-
- Publisher<Integer> numbers = camelStreams.getPublisher("numbers", Integer.class);
- Publisher<String> strings = camelStreams.getPublisher("strings", String.class);
-
- Flux.from(numbers)
- .zipWith(strings)
- .map(tuple -> "Seq: " + tuple.getT1() + " - " + tuple.getT2())
- .doOnNext(LOG::info)
- .subscribe();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/app/ReactiveStreamsSpringBootApp.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/app/ReactiveStreamsSpringBootApp.java b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/app/ReactiveStreamsSpringBootApp.java
new file mode 100644
index 0000000..10a62a1
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/java/org/apache/camel/example/reactive/streams/app/ReactiveStreamsSpringBootApp.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example.reactive.streams.app;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+//CHECKSTYLE:OFF
+@SpringBootApplication
+public class ReactiveStreamsSpringBootApp {
+
+ public static void main(String[] args) {
+ SpringApplication.run(ReactiveStreamsSpringBootApp.class, args);
+ }
+
+}
+//CHECKSTYLE:ON
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories b/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..8be4929
--- /dev/null
+++ b/examples/camel-example-reactive-streams/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.apache.camel.example.reactive.streams.BasicCamelToReactorExample,\
+org.apache.camel.example.reactive.streams.BasicReactorToCamelExample,\
+org.apache.camel.example.reactive.streams.BasicReactorToCamelInOutExample,\
+org.apache.camel.example.reactive.streams.BasicCamelToReactorInOutExample,\
+org.apache.camel.example.reactive.streams.RestExample
+
http://git-wip-us.apache.org/repos/asf/camel/blob/b4f75905/examples/camel-example-reactive-streams/src/main/resources/application.yml
----------------------------------------------------------------------
diff --git a/examples/camel-example-reactive-streams/src/main/resources/application.yml b/examples/camel-example-reactive-streams/src/main/resources/application.yml
index d5e4f9b..eeeb3ac 100644
--- a/examples/camel-example-reactive-streams/src/main/resources/application.yml
+++ b/examples/camel-example-reactive-streams/src/main/resources/application.yml
@@ -15,3 +15,15 @@
## limitations under the License.
## ------------------------------------------------------------------------
+# All examples are enabled by default.
+# You can turn them on-off here
+examples:
+ basic:
+ camel-to-reactor: true
+ reactor-to-camel: true
+ camel-to-reactor-in-out: true
+ reactor-to-camel-in-out: true
+ others:
+ rest-example: true
+
+
[2/2] camel git commit: CAMEL-10612: allow a stream to request data
to Camel
Posted by nf...@apache.org.
CAMEL-10612: allow a stream to request data to Camel
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5108cb51
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5108cb51
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5108cb51
Branch: refs/heads/master
Commit: 5108cb512f326ff690337f9e1fcfdaffa089601b
Parents: 1885db2
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Thu Feb 2 10:58:14 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Thu Feb 2 17:06:20 2017 +0100
----------------------------------------------------------------------
.../api/CamelReactiveStreamsService.java | 22 ++
.../engine/CamelReactiveStreamsServiceImpl.java | 55 +++-
.../streams/engine/CamelSubscriber.java | 5 +
.../streams/engine/DelayedMonoPublisher.java | 188 ++++++++++++++
.../streams/DelayedMonoPublisherTest.java | 257 +++++++++++++++++++
.../reactive/streams/ExchangeRequestTest.java | 89 +++++++
.../support/ReactiveStreamsTestService.java | 10 +
7 files changed, 625 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index b865a48..3a7acd4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -77,6 +77,28 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
*/
<T> Subscriber<T> getSubscriber(String name, Class<T> type);
+ /**
+ * Pushes the given data into the specified Camel stream and returns a Publisher (mono) holding
+ * the resulting exchange or an error.
+ *
+ * @param name the stream name
+ * @param data the data to push
+ * @return an publisher with the resulting exchange
+ */
+ Publisher<Exchange> request(String name, Object data);
+
+ /**
+ * Pushes the given data into the specified Camel stream and returns a Publisher (mono) holding
+ * the exchange output or an error.
+ *
+ * @param name the stream name
+ * @param data the data to push
+ * @param type the type to which the output should be converted
+ * @param <T> the generic type of the resulting Publisher
+ * @return an publisher with the resulting data
+ */
+ <T> Publisher<T> request(String name, Object data, Class<T> type);
+
/*
* Methods for Camel producers.
*/
http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 7b285a6..edffb9e 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
@@ -29,10 +30,14 @@ import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServi
import org.apache.camel.component.reactive.streams.api.DispatchCallback;
import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.spi.Synchronization;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
-
+/**
+ * The default implementation of the reactive streams service.
+ */
public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsService {
private CamelContext context;
@@ -100,6 +105,54 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
getPayloadPublisher(name).publish(payload);
}
+ @Override
+ public Publisher<Exchange> request(String name, Object data) {
+ Exchange exchange;
+ if (data instanceof Exchange) {
+ exchange = (Exchange) data;
+ } else {
+ exchange = new DefaultExchange(context);
+ exchange.setPattern(ExchangePattern.InOut);
+ exchange.getIn().setBody(data);
+ }
+
+ return doRequest(name, exchange);
+ }
+
+ @Override
+ public <T> Publisher<T> request(String name, Object data, Class<T> type) {
+ return new ConvertingPublisher<>(request(name, data), type);
+ }
+
+ protected Publisher<Exchange> doRequest(String name, Exchange data) {
+ ReactiveStreamsConsumer consumer = getSubscriber(name).getConsumer();
+ if (consumer == null) {
+ throw new IllegalStateException("No consumers attached to the stream " + name);
+ }
+
+ DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool);
+
+ data.addOnCompletion(new Synchronization() {
+ @Override
+ public void onComplete(Exchange exchange) {
+ publisher.setData(exchange);
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ Throwable throwable = exchange.getException();
+ if (throwable == null) {
+ throwable = new IllegalStateException("Unknown Exception");
+ }
+ publisher.setException(throwable);
+ }
+ });
+
+ consumer.process(data, doneSync -> {
+ });
+
+ return publisher;
+ }
private CamelPublisher getPayloadPublisher(String name) {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index eda2863..098d0ca 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -64,6 +64,10 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
refill();
}
+ public synchronized ReactiveStreamsConsumer getConsumer() {
+ return consumer;
+ }
+
public void detachConsumer() {
synchronized (this) {
this.consumer = null;
@@ -86,6 +90,7 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
}
if (!allowed) {
+ LOG.warn("There is another active subscription: cancelled");
subscription.cancel();
} else {
refill();
http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
new file mode 100644
index 0000000..ef8ece1
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.engine;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Publish a single item as soon as it's available.
+ */
+public class DelayedMonoPublisher<T> implements Publisher<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DelayedMonoPublisher.class);
+
+ private ExecutorService workerPool;
+
+ private volatile T data;
+
+ private volatile Throwable exception;
+
+ private List<MonoSubscription> subscriptions = new CopyOnWriteArrayList<>();
+
+ private AtomicBoolean flushing = new AtomicBoolean(false);
+
+ public DelayedMonoPublisher(ExecutorService workerPool) {
+ this.workerPool = workerPool;
+ }
+
+ @Override
+ public void subscribe(Subscriber<? super T> subscriber) {
+ Objects.requireNonNull(subscriber, "subscriber must not be null");
+ MonoSubscription sub = new MonoSubscription(subscriber);
+ subscriptions.add(sub);
+ subscriber.onSubscribe(sub);
+ flushCycle();
+ }
+
+ public T getData() {
+ return data;
+ }
+
+ public void setData(T data) {
+ Objects.requireNonNull(data, "data must be not null");
+ if (this.data != null) {
+ throw new IllegalStateException("data has already been set");
+ } else if (this.exception != null) {
+ throw new IllegalStateException("an exception has already been set");
+ }
+
+ this.data = data;
+ flushCycle();
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+
+ public void setException(Throwable exception) {
+ Objects.requireNonNull(exception, "exception must be not null");
+ if (this.data != null) {
+ throw new IllegalStateException("data has already been set");
+ } else if (this.exception != null) {
+ throw new IllegalStateException("an exception has already been set");
+ }
+
+ this.exception = exception;
+ flushCycle();
+ }
+
+ private void flushCycle() {
+ boolean notRunning = flushing.compareAndSet(false, true);
+
+ if (notRunning) {
+ workerPool.execute(() -> {
+ try {
+ List<MonoSubscription> completed = new LinkedList<>();
+ for (MonoSubscription sub : this.subscriptions) {
+ sub.flush();
+ if (sub.isTerminated()) {
+ completed.add(sub);
+ }
+ }
+ this.subscriptions.removeAll(completed);
+ } finally {
+ flushing.set(false);
+ }
+
+ boolean runAgain = false;
+ for (MonoSubscription sub : this.subscriptions) {
+ if (sub.isReady()) {
+ runAgain = true;
+ break;
+ }
+ }
+ if (runAgain) {
+ flushCycle();
+ }
+
+ });
+ }
+ }
+
+ private final class MonoSubscription implements Subscription {
+
+ private volatile boolean terminated;
+
+ private volatile boolean requested;
+
+ private Subscriber<? super T> subscriber;
+
+ private MonoSubscription(Subscriber<? super T> subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public void request(long l) {
+ if (terminated) {
+ throw new IllegalStateException("The subscription is terminated");
+ }
+
+ if (l <= 0) {
+ subscriber.onError(new IllegalArgumentException("3.9"));
+ synchronized (this) {
+ terminated = true;
+ }
+ } else {
+ synchronized (this) {
+ requested = true;
+ }
+ }
+
+ }
+
+ public void flush() {
+ synchronized (this) {
+ if (!isReady()) {
+ return;
+ }
+
+ terminated = true;
+ }
+
+ if (data != null) {
+ subscriber.onNext(data);
+ subscriber.onComplete();
+ } else {
+ subscriber.onError(exception);
+ }
+ }
+
+ public boolean isTerminated() {
+ return terminated;
+ }
+
+ public boolean isReady() {
+ return !terminated && requested && (data != null || exception != null);
+ }
+
+ @Override
+ public synchronized void cancel() {
+ terminated = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
new file mode 100644
index 0000000..011225e
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.component.reactive.streams.engine.DelayedMonoPublisher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class DelayedMonoPublisherTest {
+
+ private ExecutorService service;
+
+ @Before
+ public void init() {
+ service = new ScheduledThreadPoolExecutor(3);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ service.shutdown();
+ service.awaitTermination(1, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testAlreadyAvailable() throws Exception {
+
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+ pub.setData(5);
+
+ LinkedList<Integer> data = new LinkedList<>();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Flowable.fromPublisher(pub)
+ .doOnNext(data::add)
+ .doOnComplete(latch::countDown)
+ .subscribe();
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+ assertEquals(1, data.size());
+ assertEquals(5, data.get(0).intValue());
+ }
+
+ @Test
+ public void testExceptionAlreadyAvailable() throws Exception {
+
+ Exception ex = new RuntimeException("An exception");
+
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+ pub.setException(ex);
+
+ LinkedList<Throwable> exceptions = new LinkedList<>();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Flowable.fromPublisher(pub)
+ .doOnError(exceptions::add)
+ .doOnError(e -> latch.countDown())
+ .subscribe();
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+ assertEquals(1, exceptions.size());
+ assertEquals(ex, exceptions.get(0));
+ }
+
+ @Test
+ public void testAvailableSoon() throws Exception {
+
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+ LinkedList<Integer> data = new LinkedList<>();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Flowable.fromPublisher(pub)
+ .doOnNext(data::add)
+ .doOnComplete(latch::countDown)
+ .subscribe();
+
+ Thread.yield();
+ pub.setData(5);
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+ assertEquals(1, data.size());
+ assertEquals(5, data.get(0).intValue());
+ }
+
+ @Test
+ public void testAvailableLater() throws Exception {
+
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+ LinkedList<Integer> data = new LinkedList<>();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Flowable.fromPublisher(pub)
+ .doOnNext(data::add)
+ .doOnComplete(latch::countDown)
+ .subscribe();
+
+ Thread.sleep(200);
+ pub.setData(5);
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+ assertEquals(1, data.size());
+ assertEquals(5, data.get(0).intValue());
+ }
+
+ @Test
+ public void testMultipleSubscribers() throws Exception {
+
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+ ConcurrentLinkedDeque<Integer> data = new ConcurrentLinkedDeque<>();
+ CountDownLatch latch = new CountDownLatch(2);
+
+ Flowable.fromPublisher(pub)
+ .doOnNext(data::add)
+ .doOnComplete(latch::countDown)
+ .subscribe();
+
+ Flowable.fromPublisher(pub)
+ .doOnNext(data::add)
+ .doOnComplete(latch::countDown)
+ .subscribe();
+
+ Thread.sleep(200);
+ pub.setData(5);
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+ assertEquals(2, data.size());
+ for (Integer n : data) {
+ assertEquals(5, n.intValue());
+ }
+ }
+
+ @Test
+ public void testMultipleSubscribersMixedArrival() throws Exception {
+
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+ ConcurrentLinkedDeque<Integer> data = new ConcurrentLinkedDeque<>();
+ CountDownLatch latch = new CountDownLatch(2);
+
+ Flowable.fromPublisher(pub)
+ .doOnNext(data::add)
+ .doOnComplete(latch::countDown)
+ .subscribe();
+
+ Thread.sleep(200);
+ pub.setData(5);
+
+ Flowable.fromPublisher(pub)
+ .doOnNext(data::add)
+ .doOnComplete(latch::countDown)
+ .subscribe();
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+ assertEquals(2, data.size());
+ for (Integer n : data) {
+ assertEquals(5, n.intValue());
+ }
+ }
+
+ @Test
+ public void testMultipleSubscribersMixedArrivalException() throws Exception {
+
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+
+ Exception ex = new RuntimeException("An exception");
+
+ ConcurrentLinkedDeque<Throwable> exceptions = new ConcurrentLinkedDeque<>();
+ CountDownLatch latch = new CountDownLatch(2);
+
+ Flowable.fromPublisher(pub)
+ .doOnError(exceptions::add)
+ .doOnError(e -> latch.countDown())
+ .subscribe();
+
+ Thread.sleep(200);
+ pub.setException(ex);
+
+ Flowable.fromPublisher(pub)
+ .doOnError(exceptions::add)
+ .doOnError(e -> latch.countDown())
+ .subscribe();
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+ assertEquals(2, exceptions.size());
+ for (Throwable t : exceptions) {
+ assertEquals(ex, t);
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testDataOrExceptionAllowed() throws Exception {
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+ Exception ex = new RuntimeException("An exception");
+ pub.setException(ex);
+ pub.setData(1);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testDataOrExceptionAllowed2() throws Exception {
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+ pub.setData(1);
+ Exception ex = new RuntimeException("An exception");
+ pub.setException(ex);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testOnlyOneDataAllowed() throws Exception {
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+ pub.setData(1);
+ pub.setData(2);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testOnlyOneExceptionAllowed() throws Exception {
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+ pub.setException(new RuntimeException("An exception"));
+ pub.setException(new RuntimeException("An exception"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
new file mode 100644
index 0000000..4e96ac6
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/ExchangeRequestTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+
+
+public class ExchangeRequestTest extends CamelTestSupport {
+
+ @Test
+ public void testStreamRequest() throws Exception {
+
+ CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+ Publisher<Exchange> string = camel.request("data", new DefaultExchange(context));
+
+ Exchange res = Flowable.fromPublisher(string).blockingFirst();
+
+ assertNotNull(res);
+
+ String content = res.getIn().getBody(String.class);
+ assertNotNull(content);
+ assertEquals("123", content);
+ }
+
+ @Test
+ public void testInteraction() throws Exception {
+
+ CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+ Integer res = Flowable.fromPublisher(camel.request("plusOne", 1L, Integer.class))
+ .blockingFirst();
+
+ assertNotNull(res);
+ assertEquals(2, res.intValue());
+ }
+
+ @Test
+ public void testMultipleInteractions() throws Exception {
+ CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
+
+ Integer sum = Flowable.just(1, 2, 3)
+ .flatMap(e -> camel.request("plusOne", e, Integer.class))
+ .reduce((i, j) -> i + j)
+ .blockingGet();
+
+ assertNotNull(sum);
+ assertEquals(9, sum.intValue());
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("reactive-streams:data")
+ .setBody().constant("123");
+
+ from("reactive-streams:plusOne")
+ .setBody().body(Integer.class, b -> b + 1)
+ .log("Hello ${body}");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/5108cb51/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 0301757..5288263 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -104,6 +104,16 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
}
+ @Override
+ public Publisher<Exchange> request(String name, Object data) {
+ return null;
+ }
+
+ @Override
+ public <T> Publisher<T> request(String name, Object data, Class<T> type) {
+ return null;
+ }
+
public String getName() {
return name;
}