You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/02/03 12:47:54 UTC
[1/2] camel git commit: CAMEL-10612: added a bunch of client API
Repository: camel
Updated Branches:
refs/heads/master 0f9b93b03 -> 2648a301f
CAMEL-10612: added a bunch of client API
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2648a301
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2648a301
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2648a301
Branch: refs/heads/master
Commit: 2648a301fee1d0a03f42fb97ce91eef2a310641e
Parents: a7e838d
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Feb 3 13:46:36 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Fri Feb 3 13:46:47 2017 +0100
----------------------------------------------------------------------
.../api/CamelReactiveStreamsService.java | 105 +++++++++
.../engine/CamelReactiveStreamsServiceImpl.java | 137 +++++++++---
.../streams/util/ConvertingPublisher.java | 6 +-
.../streams/util/UnwrapStreamProcessor.java | 13 +-
.../reactive/streams/BeanCallTest.java | 4 +-
.../reactive/streams/DirectClientAPITest.java | 212 +++++++++++++++++++
.../support/ReactiveStreamsTestService.java | 40 ++++
.../support/ReactiveStreamsTestSupport.java | 38 ++++
8 files changed, 526 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index 03ca6a0..6b639cf 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -126,6 +126,111 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
<T> Function<Object, Publisher<T>> request(String name, Class<T> type);
/*
+ * Direct client API methods
+ */
+
+ /**
+ * Creates a new stream from the endpoint URI (used as Camel Consumer) and returns
+ * the associated {@code Publisher}.
+ *
+ * If a stream has already been created, the existing {@link Publisher} is returned.
+ *
+ * @param uri the consumer uri
+ * @return the publisher associated to the uri
+ */
+ Publisher<Exchange> publishURI(String uri);
+
+ /**
+ * Creates a new stream of the given type from the endpoint URI (used as Camel Consumer) and returns
+ * the associated {@code Publisher}.
+ *
+ * If a stream has already been created, the existing {@link Publisher} is returned.
+ *
+ * @param uri the consumer uri
+ * @param type the type of items emitted by the publisher
+ * @param <T> the type to which Camel should convert exchanges to
+ * @return the publisher associated to the uri
+ */
+ <T> Publisher<T> publishURI(String uri, Class<T> type);
+
+ /**
+ * Creates a new route that uses the endpoint URI as producer, pushes the given data to the route
+ * and returns a {@code Publisher} that will eventually return the resulting exchange or an error.
+ *
+ * @param uri the producer uri
+ * @param data the data to push
+ * @return a publisher with the resulting exchange
+ */
+ Publisher<Exchange> requestURI(String uri, Object data);
+
+ /**
+ * Creates a new route that uses the endpoint URI as producer, and returns a
+ * function that pushes the data into the route and returns the
+ * {@code Publisher} that holds the resulting exchange or the error.
+ *
+ *
+ * This is a curryied version of {@link CamelReactiveStreamsService#requestURI(String, Object)}.
+ *
+ * @param uri the producer uri
+ * @return a function that returns a publisher with the resulting exchange
+ */
+ Function<?, ? extends Publisher<Exchange>> requestURI(String uri);
+
+ /**
+ * Creates a new route that uses the endpoint URI as producer, pushes the given data to the route
+ * and returns a {@code Publisher} that will eventually return the exchange output or an error.
+ *
+ * @param uri the producer uri
+ * @param data the data to push
+ * @param type the type to which the output should be converted
+ * @param <T> the generic type of the resulting Publisher
+ * @return a publisher with the resulting data
+ */
+ <T> Publisher<T> requestURI(String uri, Object data, Class<T> type);
+
+ /**
+ * Creates a new route that uses the endpoint URI as producer, and returns a
+ * function that pushes the data into the route and returns the
+ * {@code Publisher} that holds the exchange output or an error.
+ *
+ * This is a curryied version of {@link CamelReactiveStreamsService#requestURI(String, Object, Class)}.
+ *
+ * @param uri the producer uri
+ * @param type the type to which the output should be converted
+ * @param <T> the generic type of the resulting Publisher
+ * @return a function that returns a publisher with the resulting data
+ */
+ <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type);
+
+ /**
+ * Adds a processing step at the specified endpoint uri (usually a "direct:name") that delegates
+ * to the given reactive processor.
+ *
+ * The processor receives a {@link Publisher} of exchanges and returns an object.
+ * If the output of the processor is a {@link Publisher}, it will be unwrapped before
+ * delivering the result to the source route.
+ *
+ * @param uri the uri where the processor should be attached
+ * @param processor the reactive processor
+ */
+ void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor);
+
+ /**
+ * Adds a processing step at the specified endpoint uri (usually a "direct:name") that delegates
+ * to the given reactive processor.
+ *
+ * The processor receives a {@link Publisher} of items of the given type and returns an object.
+ * If the output of the processor is a {@link Publisher}, it will be unwrapped before
+ * delivering the result to the source route.
+ *
+ * @param uri the uri where the processor should be attached
+ * @param type the type to which the body of the exchange should be converted
+ * @param <T> the generic type of the Publisher that should be processed
+ * @param processor the reactive processor
+ */
+ <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor);
+
+ /*
* Methods for Camel producers.
*/
http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 4b67ce0..f5f5f4e 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -16,14 +16,15 @@
*/
package org.apache.camel.component.reactive.streams.engine;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
@@ -31,6 +32,8 @@ import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServi
import org.apache.camel.component.reactive.streams.api.DispatchCallback;
import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
+import org.apache.camel.component.reactive.streams.util.MonoPublisher;
+import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.spi.Synchronization;
import org.reactivestreams.Publisher;
@@ -45,9 +48,13 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
private ExecutorService workerPool;
- private Map<String, CamelPublisher> publishers = new HashMap<>();
+ private final Map<String, CamelPublisher> publishers = new ConcurrentHashMap<>();
- private final Map<String, CamelSubscriber> subscribers = new HashMap<>();
+ private final Map<String, CamelSubscriber> subscribers = new ConcurrentHashMap<>();
+
+ private final Map<String, String> publishedUriToStream = new ConcurrentHashMap<>();
+
+ private final Map<String, String> requestedUriToStream = new ConcurrentHashMap<>();
public CamelReactiveStreamsServiceImpl() {
}
@@ -82,13 +89,8 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
@Override
public CamelSubscriber getSubscriber(String name) {
- synchronized (this) {
- if (!subscribers.containsKey(name)) {
- CamelSubscriber sub = new CamelSubscriber(name);
- subscribers.put(name, sub);
- }
- return subscribers.get(name);
- }
+ subscribers.computeIfAbsent(name, n -> new CamelSubscriber(name));
+ return subscribers.get(name);
}
@SuppressWarnings("unchecked")
@@ -108,15 +110,7 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
@Override
public Publisher<Exchange> request(String name, Object data) {
- Exchange exchange;
- if (data instanceof Exchange) {
- exchange = (Exchange) data;
- } else {
- exchange = new DefaultExchange(context);
- exchange.setPattern(ExchangePattern.InOut);
- exchange.getIn().setBody(data);
- }
-
+ Exchange exchange = convertToExchange(data);
return doRequest(name, exchange);
}
@@ -166,17 +160,99 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
}
private CamelPublisher getPayloadPublisher(String name) {
- synchronized (this) {
- if (!publishers.containsKey(name)) {
- CamelPublisher publisher = new CamelPublisher(this.workerPool, this.context, name);
- publishers.put(name, publisher);
+ publishers.computeIfAbsent(name, n -> new CamelPublisher(this.workerPool, this.context, n));
+ return publishers.get(name);
+ }
+
+ @Override
+ public Publisher<Exchange> publishURI(String uri) {
+ publishedUriToStream.computeIfAbsent(uri, u -> {
+ try {
+ String uuid = context.getUuidGenerator().generateUuid();
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(u)
+ .to("reactive-streams:" + uuid);
+ }
+ }.addRoutesToCamelContext(context);
+
+ return uuid;
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + uri, e);
}
+ });
+ return getPublisher(publishedUriToStream.get(uri));
+ }
+
+ @Override
+ public <T> Publisher<T> publishURI(String uri, Class<T> type) {
+ return new ConvertingPublisher<T>(publishURI(uri), type);
+ }
+
+ @Override
+ public Publisher<Exchange> requestURI(String uri, Object data) {
+ requestedUriToStream.computeIfAbsent(uri, u -> {
+ try {
+ String uuid = context.getUuidGenerator().generateUuid();
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("reactive-streams:" + uuid)
+ .to(u);
+ }
+ }.addRoutesToCamelContext(context);
+
+ return uuid;
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + uri, e);
+ }
+ });
+ return request(requestedUriToStream.get(uri), data);
+ }
- return publishers.get(name);
+ @Override
+ public Function<?, ? extends Publisher<Exchange>> requestURI(String uri) {
+ return data -> requestURI(uri, data);
+ }
+
+ @Override
+ public <T> Publisher<T> requestURI(String uri, Object data, Class<T> type) {
+ return new ConvertingPublisher<T>(requestURI(uri, data), type);
+ }
+
+ @Override
+ public <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type) {
+ return data -> requestURI(uri, data, type);
+ }
+
+
+ @Override
+ public void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor) {
+ try {
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(uri)
+ .process(exchange -> {
+ Exchange copy = exchange.copy();
+ Object result = processor.apply(new MonoPublisher<>(copy));
+ exchange.getIn().setBody(result);
+ })
+ .process(new UnwrapStreamProcessor());
+ }
+ }.addRoutesToCamelContext(context);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + uri, e);
}
}
@Override
+ public <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
+ processFromURI(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type)));
+ }
+
+ @Override
public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
getSubscriber(name).attachConsumer(consumer);
}
@@ -206,4 +282,17 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
return this.context;
}
+ private Exchange convertToExchange(Object data) {
+ Exchange exchange;
+ if (data instanceof Exchange) {
+ exchange = (Exchange) data;
+ } else {
+ exchange = new DefaultExchange(context);
+ exchange.setPattern(ExchangePattern.InOut);
+ exchange.getIn().setBody(data);
+ }
+
+ return exchange;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
index 12ed7df..44f7e8c 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
@@ -76,7 +76,11 @@ public class ConvertingPublisher<R> implements Publisher<R> {
R r;
try {
- r = ex.getIn().getBody(type);
+ if (ex.hasOut()) {
+ r = ex.getOut().getBody(type);
+ } else {
+ r = ex.getIn().getBody(type);
+ }
} catch (TypeConversionException e) {
LOG.warn("Unable to convert body to the specified type: " + type.getName(), e);
r = null;
http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
index 3fb1a8a..c5bb03f 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
@@ -74,7 +74,18 @@ public class UnwrapStreamProcessor implements AsyncProcessor {
} else {
body = data;
}
- exchange.getIn().setBody(body);
+
+ if (body instanceof Exchange && !exchange.equals(body)) {
+ // copy into the original Exchange
+ Exchange copy = (Exchange) body;
+ exchange.setException(copy.getException());
+ exchange.setIn(copy.getIn());
+ exchange.setOut(copy.getOut());
+ exchange.getProperties().clear();
+ exchange.getProperties().putAll(copy.getProperties());
+ } else {
+ exchange.getIn().setBody(body);
+ }
}
});
http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
index 1b97382..3e714b5 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
@@ -31,9 +31,7 @@ import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.reactivestreams.Publisher;
-/**
- *
- */
+
public class BeanCallTest extends CamelTestSupport {
@Test
http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java
new file mode 100644
index 0000000..cd62bcf
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.reactive.streams.support.ReactiveStreamsTestSupport;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+
+
+public class DirectClientAPITest extends ReactiveStreamsTestSupport {
+
+ @Test
+ public void testFromDirect() throws Exception {
+
+ Publisher<Integer> data = camel.publishURI("direct:endpoint", Integer.class);
+
+ BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
+
+ Flowable.fromPublisher(data)
+ .map(i -> -i)
+ .doOnNext(queue::add)
+ .subscribe();
+
+ context.start();
+ template.sendBody("direct:endpoint", 1);
+
+ Integer res = queue.poll(1, TimeUnit.SECONDS);
+ assertNotNull(res);
+ assertEquals(-1, res.intValue());
+ }
+
+ @Test
+ public void testFromDirectOnHotContext() throws Exception {
+
+ context.start();
+ Thread.sleep(200);
+
+ Publisher<Integer> data = camel.publishURI("direct:endpoint", Integer.class);
+
+ BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
+
+ Flowable.fromPublisher(data)
+ .map(i -> -i)
+ .doOnNext(queue::add)
+ .subscribe();
+
+ template.sendBody("direct:endpoint", 1);
+
+ Integer res = queue.poll(1, TimeUnit.SECONDS);
+ assertNotNull(res);
+ assertEquals(-1, res.intValue());
+ }
+
+ @Test
+ public void testDirectCall() throws Exception {
+ context.start();
+
+ BlockingQueue<String> queue = new LinkedBlockingDeque<>();
+
+ Flowable.just(1, 2, 3)
+ .flatMap(camel.requestURI("bean:hello", String.class)::apply)
+ .doOnNext(queue::add)
+ .subscribe();
+
+ for (int i = 1; i <= 3; i++) {
+ String res = queue.poll(1, TimeUnit.SECONDS);
+ assertEquals("Hello " + i, res);
+ }
+
+ }
+
+ @Test
+ public void testProxiedDirectCall() throws Exception {
+ context.start();
+
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:proxy")
+ .to("bean:hello")
+ .setBody().simple("proxy to ${body}");
+ }
+ }.addRoutesToCamelContext(context);
+
+ BlockingQueue<String> queue = new LinkedBlockingDeque<>();
+
+ Flowable.just(1, 2, 3)
+ .flatMap(camel.requestURI("direct:proxy", String.class)::apply)
+ .doOnNext(queue::add)
+ .subscribe();
+
+ for (int i = 1; i <= 3; i++) {
+ String res = queue.poll(1, TimeUnit.SECONDS);
+ assertEquals("proxy to Hello " + i, res);
+ }
+
+ }
+
+ @Test
+ public void testDirectCallFromCamel() throws Exception {
+
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:source")
+ .to("direct:stream")
+ .setBody().simple("after stream: ${body}")
+ .to("mock:dest");
+ }
+ }.addRoutesToCamelContext(context);
+
+ context.start();
+
+ camel.processFromURI("direct:stream", p ->
+ Flowable.fromPublisher(p)
+ .map(exchange -> {
+ int val = exchange.getIn().getBody(Integer.class);
+ exchange.getOut().setBody(-val);
+ return exchange;
+ })
+ );
+
+ for (int i = 1; i <= 3; i++) {
+ template.sendBody("direct:source", i);
+ }
+
+ MockEndpoint mock = getMockEndpoint("mock:dest");
+ mock.expectedMessageCount(3);
+ mock.assertIsSatisfied();
+
+ int id = 1;
+ for (Exchange ex : mock.getExchanges()) {
+ String content = ex.getIn().getBody(String.class);
+ assertEquals("after stream: " + (-id++), content);
+ }
+ }
+
+ @Test
+ public void testDirectCallFromCamelWithConversion() throws Exception {
+
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:source")
+ .to("direct:stream")
+ .setBody().simple("after stream: ${body}")
+ .to("mock:dest");
+ }
+ }.addRoutesToCamelContext(context);
+
+ context.start();
+
+ camel.processFromURI("direct:stream", Integer.class, p ->
+ Flowable.fromPublisher(p)
+ .map(i -> -i)
+ );
+
+ for (int i = 1; i <= 3; i++) {
+ template.sendBody("direct:source", i);
+ }
+
+ MockEndpoint mock = getMockEndpoint("mock:dest");
+ mock.expectedMessageCount(3);
+ mock.assertIsSatisfied();
+
+ int id = 1;
+ for (Exchange ex : mock.getExchanges()) {
+ String content = ex.getIn().getBody(String.class);
+ assertEquals("after stream: " + (-id++), content);
+ }
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+ registry.bind("hello", new SampleBean());
+ return registry;
+ }
+
+ public static class SampleBean {
+
+ public String hello(String name) {
+ return "Hello " + name;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 6ab9c5e..186a9b5 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -126,6 +126,46 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
return null;
}
+ @Override
+ public Publisher<Exchange> publishURI(String uri) {
+ return null;
+ }
+
+ @Override
+ public <T> Publisher<T> publishURI(String uri, Class<T> type) {
+ return null;
+ }
+
+ @Override
+ public Publisher<Exchange> requestURI(String uri, Object data) {
+ return null;
+ }
+
+ @Override
+ public Function<?, ? extends Publisher<Exchange>> requestURI(String uri) {
+ return null;
+ }
+
+ @Override
+ public <T> Publisher<T> requestURI(String uri, Object data, Class<T> type) {
+ return null;
+ }
+
+ @Override
+ public <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type) {
+ return null;
+ }
+
+ @Override
+ public void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor) {
+
+ }
+
+ @Override
+ public <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
+
+ }
+
public String getName() {
return name;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java
new file mode 100644
index 0000000..0fb4b6e
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.support;
+
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+
+public class ReactiveStreamsTestSupport extends CamelTestSupport {
+
+ protected CamelReactiveStreamsService camel;
+
+ @Before
+ public void initReactiveStreamService() {
+ this.camel = CamelReactiveStreams.get(context);
+ }
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+}
[2/2] camel git commit: CAMEL-10612: fixing unwrap stream processor
with std and empty data
Posted by nf...@apache.org.
CAMEL-10612: fixing unwrap stream processor with std and empty data
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a7e838d9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a7e838d9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a7e838d9
Branch: refs/heads/master
Commit: a7e838d963b76ef0d943efef03f5d4c6eaaf4624
Parents: 0f9b93b
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Feb 3 10:01:17 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Fri Feb 3 13:46:47 2017 +0100
----------------------------------------------------------------------
.../streams/util/UnwrapStreamProcessor.java | 6 +-
.../reactive/streams/BeanCallTest.java | 124 ++++++++++++++++++-
2 files changed, 126 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a7e838d9/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
index 6800ce0..3fb1a8a 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
@@ -78,8 +78,12 @@ public class UnwrapStreamProcessor implements AsyncProcessor {
}
});
+
+ return false;
}
- return false;
+
+ callback.done(true);
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/a7e838d9/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
index 5a532ae..1b97382 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.reactive.streams;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
import io.reactivex.Flowable;
import org.apache.camel.Exchange;
@@ -43,7 +47,6 @@ public class BeanCallTest extends CamelTestSupport {
from("direct:num")
.bean(BeanCallTest.this, "processBody")
.process(new UnwrapStreamProcessor()) // Can be removed?
- .split().body()
.to("mock:endpoint");
from("direct:handle")
@@ -76,7 +79,6 @@ public class BeanCallTest extends CamelTestSupport {
from("direct:num")
.bean(BeanCallTest.this, "processBodyWrongType")
.process(new UnwrapStreamProcessor()) // Can be removed?
- .split().body()
.to("mock:endpoint");
from("direct:handle")
@@ -108,7 +110,6 @@ public class BeanCallTest extends CamelTestSupport {
from("direct:num")
.bean(BeanCallTest.this, "processHeader")
.process(new UnwrapStreamProcessor()) // Can be removed?
- .split().body()
.to("mock:endpoint");
from("direct:handle")
@@ -129,6 +130,110 @@ public class BeanCallTest extends CamelTestSupport {
assertEquals("HelloHeader 2", exchange.getIn().getBody());
}
+ @Test
+ public void beanCallEmptyPublisherTest() throws Exception {
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ onException(Throwable.class).to("direct:handle").handled(true);
+
+ from("direct:num")
+ .bean(BeanCallTest.this, "processBodyEmpty")
+ .process(new UnwrapStreamProcessor()) // Can be removed?
+ .to("mock:endpoint");
+
+ from("direct:handle")
+ .setBody().constant("ERR")
+ .to("mock:endpoint");
+
+ }
+ }.addRoutesToCamelContext(context);
+
+ MockEndpoint mock = getMockEndpoint("mock:endpoint");
+ mock.expectedMessageCount(1);
+
+ context.start();
+
+ template.sendBody("direct:num", 1);
+ mock.assertIsSatisfied();
+
+ Exchange exchange = mock.getExchanges().get(0);
+ Object body = exchange.getIn().getBody();
+ assertEquals(new Integer(1), body); // unchanged
+ }
+
+ @Test
+ public void beanCallTwoElementsTest() throws Exception {
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ onException(Throwable.class).to("direct:handle").handled(true);
+
+ from("direct:num")
+ .bean(BeanCallTest.this, "processBodyTwoItems")
+ .process(new UnwrapStreamProcessor()) // Can be removed?
+ .to("mock:endpoint");
+
+ from("direct:handle")
+ .setBody().constant("ERR")
+ .to("mock:endpoint");
+
+ }
+ }.addRoutesToCamelContext(context);
+
+ MockEndpoint mock = getMockEndpoint("mock:endpoint");
+ mock.expectedMessageCount(1);
+
+ context.start();
+
+ template.sendBody("direct:num", 1);
+ mock.assertIsSatisfied();
+
+ Exchange exchange = mock.getExchanges().get(0);
+ Object body = exchange.getIn().getBody();
+ assertTrue(body instanceof Collection);
+ @SuppressWarnings("unchecked")
+ List<String> data = new LinkedList<>((Collection<String>) body);
+ assertListSize(data, 2);
+ assertEquals("HelloBody 1", data.get(0));
+ assertEquals("HelloBody 1", data.get(1));
+ }
+
+ @Test
+ public void beanCallStdReturnTypeTest() throws Exception {
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ onException(Throwable.class).to("direct:handle").handled(true);
+
+ from("direct:num")
+ .bean(BeanCallTest.this, "processBodyStd")
+ .process(new UnwrapStreamProcessor()) // Can be removed?
+ .to("mock:endpoint");
+
+ from("direct:handle")
+ .setBody().constant("ERR")
+ .to("mock:endpoint");
+
+ }
+ }.addRoutesToCamelContext(context);
+
+ MockEndpoint mock = getMockEndpoint("mock:endpoint");
+ mock.expectedMessageCount(1);
+
+ context.start();
+
+ template.sendBody("direct:num", 1);
+ mock.assertIsSatisfied();
+
+ Exchange exchange = mock.getExchanges().get(0);
+ Object body = exchange.getIn().getBody();
+ assertEquals("Hello", body);
+ }
+
public Publisher<String> processBody(Publisher<Integer> data) {
return Flowable.fromPublisher(data)
.map(l -> "HelloBody " + l);
@@ -144,6 +249,19 @@ public class BeanCallTest extends CamelTestSupport {
.map(l -> "HelloHeader " + l);
}
+ public Publisher<String> processBodyTwoItems(Publisher<Integer> data) {
+ return Flowable.fromPublisher(data).mergeWith(data)
+ .map(l -> "HelloBody " + l);
+ }
+
+ public Publisher<String> processBodyEmpty(Publisher<Integer> data) {
+ return Flowable.empty();
+ }
+
+ public String processBodyStd(Publisher<Integer> data) {
+ return "Hello";
+ }
+
@Override
public boolean isUseRouteBuilder() {
return false;