You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/02/03 12:47:54 UTC

[1/2] camel git commit: CAMEL-10612: added a bunch of client API

Repository: camel
Updated Branches:
  refs/heads/master 0f9b93b03 -> 2648a301f


CAMEL-10612: added a bunch of client API


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2648a301
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2648a301
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2648a301

Branch: refs/heads/master
Commit: 2648a301fee1d0a03f42fb97ce91eef2a310641e
Parents: a7e838d
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Feb 3 13:46:36 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Fri Feb 3 13:46:47 2017 +0100

----------------------------------------------------------------------
 .../api/CamelReactiveStreamsService.java        | 105 +++++++++
 .../engine/CamelReactiveStreamsServiceImpl.java | 137 +++++++++---
 .../streams/util/ConvertingPublisher.java       |   6 +-
 .../streams/util/UnwrapStreamProcessor.java     |  13 +-
 .../reactive/streams/BeanCallTest.java          |   4 +-
 .../reactive/streams/DirectClientAPITest.java   | 212 +++++++++++++++++++
 .../support/ReactiveStreamsTestService.java     |  40 ++++
 .../support/ReactiveStreamsTestSupport.java     |  38 ++++
 8 files changed, 526 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index 03ca6a0..6b639cf 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -126,6 +126,111 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
     <T> Function<Object, Publisher<T>> request(String name, Class<T> type);
 
     /*
+     * Direct client API methods
+     */
+
+    /**
+     * Creates a new stream from the endpoint URI (used as Camel Consumer) and returns
+     * the associated {@code Publisher}.
+     *
+     * If a stream has already been created, the existing {@link Publisher} is returned.
+     *
+     * @param uri the consumer uri
+     * @return the publisher associated to the uri
+     */
+    Publisher<Exchange> publishURI(String uri);
+
+    /**
+     * Creates a new stream of the given type from the endpoint URI (used as Camel Consumer) and returns
+     * the associated {@code Publisher}.
+     *
+     * If a stream has already been created, the existing {@link Publisher} is returned.
+     *
+     * @param uri the consumer uri
+     * @param type the type of items emitted by the publisher
+     * @param <T> the type to which Camel should convert exchanges to
+     * @return the publisher associated to the uri
+     */
+    <T> Publisher<T> publishURI(String uri, Class<T> type);
+
+    /**
+     * Creates a new route that uses the endpoint URI as producer, pushes the given data to the route
+     * and returns a {@code Publisher} that will eventually return the resulting exchange or an error.
+     *
+     * @param uri the producer uri
+     * @param data the data to push
+     * @return a publisher with the resulting exchange
+     */
+    Publisher<Exchange> requestURI(String uri, Object data);
+
+    /**
+     * Creates a new route that uses the endpoint URI as producer, and returns a
+     * function that pushes the data into the route and returns the
+     * {@code Publisher} that holds the resulting exchange or the error.
+     *
+     *
+     * This is a curryied version of {@link CamelReactiveStreamsService#requestURI(String, Object)}.
+     *
+     * @param uri the producer uri
+     * @return a function that returns a publisher with the resulting exchange
+     */
+    Function<?, ? extends Publisher<Exchange>> requestURI(String uri);
+
+    /**
+     * Creates a new route that uses the endpoint URI as producer, pushes the given data to the route
+     * and returns a {@code Publisher} that will eventually return the exchange output or an error.
+     *
+     * @param uri the producer uri
+     * @param data the data to push
+     * @param type  the type to which the output should be converted
+     * @param <T> the generic type of the resulting Publisher
+     * @return a publisher with the resulting data
+     */
+    <T> Publisher<T> requestURI(String uri, Object data, Class<T> type);
+
+    /**
+     * Creates a new route that uses the endpoint URI as producer, and returns a
+     * function that pushes the data into the route and returns the
+     * {@code Publisher} that holds the exchange output or an error.
+     *
+     * This is a curryied version of {@link CamelReactiveStreamsService#requestURI(String, Object, Class)}.
+     *
+     * @param uri the producer uri
+     * @param type  the type to which the output should be converted
+     * @param <T> the generic type of the resulting Publisher
+     * @return a function that returns a publisher with the resulting data
+     */
+    <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type);
+
+    /**
+     * Adds a processing step at the specified endpoint uri (usually a "direct:name") that delegates
+     * to the given reactive processor.
+     *
+     * The processor receives a {@link Publisher} of exchanges and returns an object.
+     * If the output of the processor is a {@link Publisher}, it will be unwrapped before
+     * delivering the result to the source route.
+     *
+     * @param uri the uri where the processor should be attached
+     * @param processor the reactive processor
+     */
+    void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor);
+
+    /**
+     * Adds a processing step at the specified endpoint uri (usually a "direct:name") that delegates
+     * to the given reactive processor.
+     *
+     * The processor receives a {@link Publisher} of items of the given type and returns an object.
+     * If the output of the processor is a {@link Publisher}, it will be unwrapped before
+     * delivering the result to the source route.
+     *
+     * @param uri the uri where the processor should be attached
+     * @param type  the type to which the body of the exchange should be converted
+     * @param <T> the generic type of the Publisher that should be processed
+     * @param processor the reactive processor
+     */
+    <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor);
+
+    /*
      * Methods for Camel producers.
      */
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 4b67ce0..f5f5f4e 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -16,14 +16,15 @@
  */
 package org.apache.camel.component.reactive.streams.engine;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
@@ -31,6 +32,8 @@ import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServi
 import org.apache.camel.component.reactive.streams.api.DispatchCallback;
 import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
 import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
+import org.apache.camel.component.reactive.streams.util.MonoPublisher;
+import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.Synchronization;
 import org.reactivestreams.Publisher;
@@ -45,9 +48,13 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
 
     private ExecutorService workerPool;
 
-    private Map<String, CamelPublisher> publishers = new HashMap<>();
+    private final Map<String, CamelPublisher> publishers = new ConcurrentHashMap<>();
 
-    private final Map<String, CamelSubscriber> subscribers = new HashMap<>();
+    private final Map<String, CamelSubscriber> subscribers = new ConcurrentHashMap<>();
+
+    private final Map<String, String> publishedUriToStream = new ConcurrentHashMap<>();
+
+    private final Map<String, String> requestedUriToStream = new ConcurrentHashMap<>();
 
     public CamelReactiveStreamsServiceImpl() {
     }
@@ -82,13 +89,8 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
 
     @Override
     public CamelSubscriber getSubscriber(String name) {
-        synchronized (this) {
-            if (!subscribers.containsKey(name)) {
-                CamelSubscriber sub = new CamelSubscriber(name);
-                subscribers.put(name, sub);
-            }
-            return subscribers.get(name);
-        }
+        subscribers.computeIfAbsent(name, n -> new CamelSubscriber(name));
+        return subscribers.get(name);
     }
 
     @SuppressWarnings("unchecked")
@@ -108,15 +110,7 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
 
     @Override
     public Publisher<Exchange> request(String name, Object data) {
-        Exchange exchange;
-        if (data instanceof Exchange) {
-            exchange = (Exchange) data;
-        } else {
-            exchange = new DefaultExchange(context);
-            exchange.setPattern(ExchangePattern.InOut);
-            exchange.getIn().setBody(data);
-        }
-
+        Exchange exchange = convertToExchange(data);
         return doRequest(name, exchange);
     }
 
@@ -166,17 +160,99 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     }
 
     private CamelPublisher getPayloadPublisher(String name) {
-        synchronized (this) {
-            if (!publishers.containsKey(name)) {
-                CamelPublisher publisher = new CamelPublisher(this.workerPool, this.context, name);
-                publishers.put(name, publisher);
+        publishers.computeIfAbsent(name, n -> new CamelPublisher(this.workerPool, this.context, n));
+        return publishers.get(name);
+    }
+
+    @Override
+    public Publisher<Exchange> publishURI(String uri) {
+        publishedUriToStream.computeIfAbsent(uri, u -> {
+            try {
+                String uuid = context.getUuidGenerator().generateUuid();
+                new RouteBuilder() {
+                    @Override
+                    public void configure() throws Exception {
+                        from(u)
+                                .to("reactive-streams:" + uuid);
+                    }
+                }.addRoutesToCamelContext(context);
+
+                return uuid;
+            } catch (Exception e) {
+                throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + uri, e);
             }
+        });
+        return getPublisher(publishedUriToStream.get(uri));
+    }
+
+    @Override
+    public <T> Publisher<T> publishURI(String uri, Class<T> type) {
+        return new ConvertingPublisher<T>(publishURI(uri), type);
+    }
+
+    @Override
+    public Publisher<Exchange> requestURI(String uri, Object data) {
+        requestedUriToStream.computeIfAbsent(uri, u -> {
+            try {
+                String uuid = context.getUuidGenerator().generateUuid();
+                new RouteBuilder() {
+                    @Override
+                    public void configure() throws Exception {
+                        from("reactive-streams:" + uuid)
+                                .to(u);
+                    }
+                }.addRoutesToCamelContext(context);
+
+                return uuid;
+            } catch (Exception e) {
+                throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + uri, e);
+            }
+        });
+        return request(requestedUriToStream.get(uri), data);
+    }
 
-            return publishers.get(name);
+    @Override
+    public Function<?, ? extends Publisher<Exchange>> requestURI(String uri) {
+        return data -> requestURI(uri, data);
+    }
+
+    @Override
+    public <T> Publisher<T> requestURI(String uri, Object data, Class<T> type) {
+        return new ConvertingPublisher<T>(requestURI(uri, data), type);
+    }
+
+    @Override
+    public <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type) {
+        return data -> requestURI(uri, data, type);
+    }
+
+
+    @Override
+    public void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor) {
+        try {
+            new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from(uri)
+                            .process(exchange -> {
+                                Exchange copy = exchange.copy();
+                                Object result = processor.apply(new MonoPublisher<>(copy));
+                                exchange.getIn().setBody(result);
+                            })
+                            .process(new UnwrapStreamProcessor());
+                }
+            }.addRoutesToCamelContext(context);
+        } catch (Exception e) {
+            throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + uri, e);
         }
     }
 
     @Override
+    public <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
+        processFromURI(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type)));
+    }
+
+    @Override
     public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
         getSubscriber(name).attachConsumer(consumer);
     }
@@ -206,4 +282,17 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
         return this.context;
     }
 
+    private Exchange convertToExchange(Object data) {
+        Exchange exchange;
+        if (data instanceof Exchange) {
+            exchange = (Exchange) data;
+        } else {
+            exchange = new DefaultExchange(context);
+            exchange.setPattern(ExchangePattern.InOut);
+            exchange.getIn().setBody(data);
+        }
+
+        return exchange;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
index 12ed7df..44f7e8c 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
@@ -76,7 +76,11 @@ public class ConvertingPublisher<R> implements Publisher<R> {
 
                 R r;
                 try {
-                    r = ex.getIn().getBody(type);
+                    if (ex.hasOut()) {
+                        r = ex.getOut().getBody(type);
+                    } else {
+                        r = ex.getIn().getBody(type);
+                    }
                 } catch (TypeConversionException e) {
                     LOG.warn("Unable to convert body to the specified type: " + type.getName(), e);
                     r = null;

http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
index 3fb1a8a..c5bb03f 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
@@ -74,7 +74,18 @@ public class UnwrapStreamProcessor implements AsyncProcessor {
                     } else {
                         body = data;
                     }
-                    exchange.getIn().setBody(body);
+
+                    if (body instanceof Exchange && !exchange.equals(body)) {
+                        // copy into the original Exchange
+                        Exchange copy = (Exchange) body;
+                        exchange.setException(copy.getException());
+                        exchange.setIn(copy.getIn());
+                        exchange.setOut(copy.getOut());
+                        exchange.getProperties().clear();
+                        exchange.getProperties().putAll(copy.getProperties());
+                    } else {
+                        exchange.getIn().setBody(body);
+                    }
                 }
 
             });

http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
index 1b97382..3e714b5 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
@@ -31,9 +31,7 @@ import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 import org.reactivestreams.Publisher;
 
-/**
- *
- */
+
 public class BeanCallTest extends CamelTestSupport {
 
     @Test

http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java
new file mode 100644
index 0000000..cd62bcf
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.reactive.streams.support.ReactiveStreamsTestSupport;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+
+
+public class DirectClientAPITest extends ReactiveStreamsTestSupport {
+
+    @Test
+    public void testFromDirect() throws Exception {
+
+        Publisher<Integer> data = camel.publishURI("direct:endpoint", Integer.class);
+
+        BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
+
+        Flowable.fromPublisher(data)
+                .map(i -> -i)
+                .doOnNext(queue::add)
+                .subscribe();
+
+        context.start();
+        template.sendBody("direct:endpoint", 1);
+
+        Integer res = queue.poll(1, TimeUnit.SECONDS);
+        assertNotNull(res);
+        assertEquals(-1, res.intValue());
+    }
+
+    @Test
+    public void testFromDirectOnHotContext() throws Exception {
+
+        context.start();
+        Thread.sleep(200);
+
+        Publisher<Integer> data = camel.publishURI("direct:endpoint", Integer.class);
+
+        BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
+
+        Flowable.fromPublisher(data)
+                .map(i -> -i)
+                .doOnNext(queue::add)
+                .subscribe();
+
+        template.sendBody("direct:endpoint", 1);
+
+        Integer res = queue.poll(1, TimeUnit.SECONDS);
+        assertNotNull(res);
+        assertEquals(-1, res.intValue());
+    }
+
+    @Test
+    public void testDirectCall() throws Exception {
+        context.start();
+
+        BlockingQueue<String> queue = new LinkedBlockingDeque<>();
+
+        Flowable.just(1, 2, 3)
+                .flatMap(camel.requestURI("bean:hello", String.class)::apply)
+                .doOnNext(queue::add)
+                .subscribe();
+
+        for (int i = 1; i <= 3; i++) {
+            String res = queue.poll(1, TimeUnit.SECONDS);
+            assertEquals("Hello " + i, res);
+        }
+
+    }
+
+    @Test
+    public void testProxiedDirectCall() throws Exception {
+        context.start();
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:proxy")
+                        .to("bean:hello")
+                        .setBody().simple("proxy to ${body}");
+            }
+        }.addRoutesToCamelContext(context);
+
+        BlockingQueue<String> queue = new LinkedBlockingDeque<>();
+
+        Flowable.just(1, 2, 3)
+                .flatMap(camel.requestURI("direct:proxy", String.class)::apply)
+                .doOnNext(queue::add)
+                .subscribe();
+
+        for (int i = 1; i <= 3; i++) {
+            String res = queue.poll(1, TimeUnit.SECONDS);
+            assertEquals("proxy to Hello " + i, res);
+        }
+
+    }
+
+    @Test
+    public void testDirectCallFromCamel() throws Exception {
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:source")
+                        .to("direct:stream")
+                        .setBody().simple("after stream: ${body}")
+                        .to("mock:dest");
+            }
+        }.addRoutesToCamelContext(context);
+
+        context.start();
+
+        camel.processFromURI("direct:stream", p ->
+                Flowable.fromPublisher(p)
+                        .map(exchange -> {
+                            int val = exchange.getIn().getBody(Integer.class);
+                            exchange.getOut().setBody(-val);
+                            return exchange;
+                        })
+        );
+
+        for (int i = 1; i <= 3; i++) {
+            template.sendBody("direct:source", i);
+        }
+
+        MockEndpoint mock = getMockEndpoint("mock:dest");
+        mock.expectedMessageCount(3);
+        mock.assertIsSatisfied();
+
+        int id = 1;
+        for (Exchange ex : mock.getExchanges()) {
+            String content = ex.getIn().getBody(String.class);
+            assertEquals("after stream: " + (-id++), content);
+        }
+    }
+
+    @Test
+    public void testDirectCallFromCamelWithConversion() throws Exception {
+
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:source")
+                        .to("direct:stream")
+                        .setBody().simple("after stream: ${body}")
+                        .to("mock:dest");
+            }
+        }.addRoutesToCamelContext(context);
+
+        context.start();
+
+        camel.processFromURI("direct:stream", Integer.class, p ->
+                Flowable.fromPublisher(p)
+                        .map(i -> -i)
+        );
+
+        for (int i = 1; i <= 3; i++) {
+            template.sendBody("direct:source", i);
+        }
+
+        MockEndpoint mock = getMockEndpoint("mock:dest");
+        mock.expectedMessageCount(3);
+        mock.assertIsSatisfied();
+
+        int id = 1;
+        for (Exchange ex : mock.getExchanges()) {
+            String content = ex.getIn().getBody(String.class);
+            assertEquals("after stream: " + (-id++), content);
+        }
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        registry.bind("hello", new SampleBean());
+        return registry;
+    }
+
+    public static class SampleBean {
+
+        public String hello(String name) {
+            return "Hello " + name;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
index 6ab9c5e..186a9b5 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java
@@ -126,6 +126,46 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService {
         return null;
     }
 
+    @Override
+    public Publisher<Exchange> publishURI(String uri) {
+        return null;
+    }
+
+    @Override
+    public <T> Publisher<T> publishURI(String uri, Class<T> type) {
+        return null;
+    }
+
+    @Override
+    public Publisher<Exchange> requestURI(String uri, Object data) {
+        return null;
+    }
+
+    @Override
+    public Function<?, ? extends Publisher<Exchange>> requestURI(String uri) {
+        return null;
+    }
+
+    @Override
+    public <T> Publisher<T> requestURI(String uri, Object data, Class<T> type) {
+        return null;
+    }
+
+    @Override
+    public <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type) {
+        return null;
+    }
+
+    @Override
+    public void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor) {
+
+    }
+
+    @Override
+    public <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
+
+    }
+
     public String getName() {
         return name;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java
new file mode 100644
index 0000000..0fb4b6e
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.support;
+
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+
+public class ReactiveStreamsTestSupport extends CamelTestSupport {
+
+    protected CamelReactiveStreamsService camel;
+
+    @Before
+    public void initReactiveStreamService() {
+        this.camel = CamelReactiveStreams.get(context);
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+}


[2/2] camel git commit: CAMEL-10612: fixing unwrap stream processor with std and empty data

Posted by nf...@apache.org.
CAMEL-10612: fixing unwrap stream processor with std and empty data


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a7e838d9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a7e838d9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a7e838d9

Branch: refs/heads/master
Commit: a7e838d963b76ef0d943efef03f5d4c6eaaf4624
Parents: 0f9b93b
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Feb 3 10:01:17 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Fri Feb 3 13:46:47 2017 +0100

----------------------------------------------------------------------
 .../streams/util/UnwrapStreamProcessor.java     |   6 +-
 .../reactive/streams/BeanCallTest.java          | 124 ++++++++++++++++++-
 2 files changed, 126 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a7e838d9/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
index 6800ce0..3fb1a8a 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java
@@ -78,8 +78,12 @@ public class UnwrapStreamProcessor implements AsyncProcessor {
                 }
 
             });
+
+            return false;
         }
-        return false;
+
+        callback.done(true);
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/a7e838d9/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
index 5a532ae..1b97382 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.component.reactive.streams;
 
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
 import io.reactivex.Flowable;
 
 import org.apache.camel.Exchange;
@@ -43,7 +47,6 @@ public class BeanCallTest extends CamelTestSupport {
                 from("direct:num")
                         .bean(BeanCallTest.this, "processBody")
                         .process(new UnwrapStreamProcessor()) // Can be removed?
-                        .split().body()
                         .to("mock:endpoint");
 
                 from("direct:handle")
@@ -76,7 +79,6 @@ public class BeanCallTest extends CamelTestSupport {
                 from("direct:num")
                         .bean(BeanCallTest.this, "processBodyWrongType")
                         .process(new UnwrapStreamProcessor()) // Can be removed?
-                        .split().body()
                         .to("mock:endpoint");
 
                 from("direct:handle")
@@ -108,7 +110,6 @@ public class BeanCallTest extends CamelTestSupport {
                 from("direct:num")
                         .bean(BeanCallTest.this, "processHeader")
                         .process(new UnwrapStreamProcessor()) // Can be removed?
-                        .split().body()
                         .to("mock:endpoint");
 
                 from("direct:handle")
@@ -129,6 +130,110 @@ public class BeanCallTest extends CamelTestSupport {
         assertEquals("HelloHeader 2", exchange.getIn().getBody());
     }
 
+    @Test
+    public void beanCallEmptyPublisherTest() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(Throwable.class).to("direct:handle").handled(true);
+
+                from("direct:num")
+                        .bean(BeanCallTest.this, "processBodyEmpty")
+                        .process(new UnwrapStreamProcessor()) // Can be removed?
+                        .to("mock:endpoint");
+
+                from("direct:handle")
+                        .setBody().constant("ERR")
+                        .to("mock:endpoint");
+
+            }
+        }.addRoutesToCamelContext(context);
+
+        MockEndpoint mock = getMockEndpoint("mock:endpoint");
+        mock.expectedMessageCount(1);
+
+        context.start();
+
+        template.sendBody("direct:num", 1);
+        mock.assertIsSatisfied();
+
+        Exchange exchange = mock.getExchanges().get(0);
+        Object body = exchange.getIn().getBody();
+        assertEquals(new Integer(1), body); // unchanged
+    }
+
+    @Test
+    public void beanCallTwoElementsTest() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(Throwable.class).to("direct:handle").handled(true);
+
+                from("direct:num")
+                        .bean(BeanCallTest.this, "processBodyTwoItems")
+                        .process(new UnwrapStreamProcessor()) // Can be removed?
+                        .to("mock:endpoint");
+
+                from("direct:handle")
+                        .setBody().constant("ERR")
+                        .to("mock:endpoint");
+
+            }
+        }.addRoutesToCamelContext(context);
+
+        MockEndpoint mock = getMockEndpoint("mock:endpoint");
+        mock.expectedMessageCount(1);
+
+        context.start();
+
+        template.sendBody("direct:num", 1);
+        mock.assertIsSatisfied();
+
+        Exchange exchange = mock.getExchanges().get(0);
+        Object body = exchange.getIn().getBody();
+        assertTrue(body instanceof Collection);
+        @SuppressWarnings("unchecked")
+        List<String> data = new LinkedList<>((Collection<String>) body);
+        assertListSize(data, 2);
+        assertEquals("HelloBody 1", data.get(0));
+        assertEquals("HelloBody 1", data.get(1));
+    }
+
+    @Test
+    public void beanCallStdReturnTypeTest() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(Throwable.class).to("direct:handle").handled(true);
+
+                from("direct:num")
+                        .bean(BeanCallTest.this, "processBodyStd")
+                        .process(new UnwrapStreamProcessor()) // Can be removed?
+                        .to("mock:endpoint");
+
+                from("direct:handle")
+                        .setBody().constant("ERR")
+                        .to("mock:endpoint");
+
+            }
+        }.addRoutesToCamelContext(context);
+
+        MockEndpoint mock = getMockEndpoint("mock:endpoint");
+        mock.expectedMessageCount(1);
+
+        context.start();
+
+        template.sendBody("direct:num", 1);
+        mock.assertIsSatisfied();
+
+        Exchange exchange = mock.getExchanges().get(0);
+        Object body = exchange.getIn().getBody();
+        assertEquals("Hello", body);
+    }
+
     public Publisher<String> processBody(Publisher<Integer> data) {
         return Flowable.fromPublisher(data)
                 .map(l -> "HelloBody " + l);
@@ -144,6 +249,19 @@ public class BeanCallTest extends CamelTestSupport {
                 .map(l -> "HelloHeader " + l);
     }
 
+    public Publisher<String> processBodyTwoItems(Publisher<Integer> data) {
+        return Flowable.fromPublisher(data).mergeWith(data)
+                .map(l -> "HelloBody " + l);
+    }
+
+    public Publisher<String> processBodyEmpty(Publisher<Integer> data) {
+        return Flowable.empty();
+    }
+
+    public String processBodyStd(Publisher<Integer> data) {
+        return "Hello";
+    }
+
     @Override
     public boolean isUseRouteBuilder() {
         return false;