You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/02/01 15:48:33 UTC

camel git commit: CAMEL-10612: adding Publisher types for bean call

Repository: camel
Updated Branches:
  refs/heads/master e16315825 -> 267f065fb


CAMEL-10612: adding Publisher types for bean call


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/267f065f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/267f065f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/267f065f

Branch: refs/heads/master
Commit: 267f065fb9640a95164404795a3094d978e313cd
Parents: e163158
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Wed Feb 1 16:47:56 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Wed Feb 1 16:48:09 2017 +0100

----------------------------------------------------------------------
 .../streams/ReactiveStreamsConverter.java       |  62 ++++++++
 .../reactive/streams/util/MonoPublisher.java    |  64 ++++++++
 .../streams/util/UnwrappingStreamProcessor.java |  78 ++++++++++
 .../services/org/apache/camel/TypeConverter     |  18 +++
 .../reactive/streams/BeanCallTest.java          | 148 +++++++++++++++++++
 5 files changed, 370 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
new file mode 100644
index 0000000..23f49c9
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.FallbackConverter;
+import org.apache.camel.Processor;
+import org.apache.camel.component.bean.BeanInfo;
+import org.apache.camel.component.bean.BeanProcessor;
+import org.apache.camel.component.bean.ConstantBeanHolder;
+import org.apache.camel.component.reactive.streams.util.MonoPublisher;
+import org.apache.camel.component.reactive.streams.util.UnwrappingStreamProcessor;
+import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.reactivestreams.Publisher;
+
+
+@Converter
+public final class ReactiveStreamsConverter implements CamelContextAware {
+
+    private CamelContext camelContext;
+
+    public ReactiveStreamsConverter() {
+    }
+
+    @FallbackConverter
+    public Object convertToPublisher(Class<?> type, Exchange exchange, Object value, TypeConverterRegistry registry) {
+        if (type.isAssignableFrom(Publisher.class)) {
+            return new MonoPublisher<>(value);
+        }
+        return null;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
new file mode 100644
index 0000000..d0be491
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.util;
+
+import java.util.Objects;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * A publisher that publish a given item, then completes.
+ */
+public class MonoPublisher<T> implements Publisher<T> {
+
+    private T item;
+
+    public MonoPublisher(T item) {
+        this.item = item;
+    }
+
+    @Override
+    public void subscribe(Subscriber<? super T> subscriber) {
+        Objects.requireNonNull(subscriber, "subscriber must not be null");
+        subscriber.onSubscribe(new Subscription() {
+
+            private boolean terminated;
+
+            @Override
+            public void request(long l) {
+                if (terminated) {
+                    throw new IllegalStateException("The subscription is terminated");
+                }
+
+                if (l <= 0) {
+                    subscriber.onError(new IllegalArgumentException("3.9"));
+                } else {
+                    subscriber.onNext(item);
+                    subscriber.onComplete();
+                }
+                terminated = true;
+            }
+
+            @Override
+            public void cancel() {
+                terminated = true;
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
new file mode 100644
index 0000000..fa85fd2
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.util;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * A Processor that converts a Publisher into its content asynchronously.
+ */
+public class UnwrappingStreamProcessor implements AsyncProcessor {
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        Object content = exchange.getIn().getBody();
+        if (content instanceof Publisher) {
+            Publisher<?> pub = Publisher.class.cast(content);
+
+            List<Object> data = new LinkedList<>();
+
+            pub.subscribe(new Subscriber<Object>() {
+
+                @Override
+                public void onSubscribe(Subscription subscription) {
+                    subscription.request(Long.MAX_VALUE);
+                }
+
+                @Override
+                public void onNext(Object o) {
+                    data.add(o);
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    exchange.getIn().setBody(data);
+                    exchange.setException(throwable);
+                    callback.done(false);
+                }
+
+                @Override
+                public void onComplete() {
+                    exchange.getIn().setBody(data);
+                    callback.done(false);
+                }
+
+            });
+        }
+        return false;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
new file mode 100644
index 0000000..1167eeb
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.component.reactive.streams.ReactiveStreamsConverter
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
new file mode 100644
index 0000000..1f05a00
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Header;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.reactive.streams.util.UnwrappingStreamProcessor;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+
+/**
+ *
+ */
+public class BeanCallTest extends CamelTestSupport {
+
+    @Test
+    public void beanCallTest() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(Throwable.class).to("direct:handle").handled(true);
+
+                from("direct:num")
+                        .bean(BeanCallTest.this, "processBody")
+                        .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+                        .to("mock:endpoint");
+
+                from("direct:handle")
+                        .setBody().constant("ERR")
+                        .to("mock:endpoint");
+
+            }
+        }.addRoutesToCamelContext(context);
+
+        MockEndpoint mock = getMockEndpoint("mock:endpoint");
+        mock.expectedMessageCount(1);
+
+        context.start();
+
+        template.sendBody("direct:num", 1);
+        mock.assertIsSatisfied();
+
+        Exchange exchange = mock.getExchanges().get(0);
+        assertEquals("HelloBody 1", exchange.getIn().getBody());
+    }
+
+    @Test
+    public void beanCallWithErrorTest() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(Throwable.class).to("direct:handle").handled(true);
+
+                from("direct:num")
+                        .bean(BeanCallTest.this, "processBodyWrongType")
+                        .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+                        .to("mock:endpoint");
+
+                from("direct:handle")
+                        .setBody().constant("ERR")
+                        .to("mock:endpoint");
+            }
+        }.addRoutesToCamelContext(context);
+
+        MockEndpoint mock = getMockEndpoint("mock:endpoint");
+        mock.expectedMessageCount(1);
+
+        context.start();
+
+        template.sendBody("direct:num", 1);
+        mock.assertIsSatisfied();
+
+        Exchange exchange = mock.getExchanges().get(0);
+        assertEquals("ERR", exchange.getIn().getBody());
+    }
+
+    @Test
+    public void beanCallHeaderMappingTest() throws Exception {
+        new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(Throwable.class).to("direct:handle").handled(true);
+
+                from("direct:num")
+                        .bean(BeanCallTest.this, "processHeader")
+                        .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+                        .to("mock:endpoint");
+
+                from("direct:handle")
+                        .setBody().constant("ERR")
+                        .to("mock:endpoint");
+            }
+        }.addRoutesToCamelContext(context);
+
+        MockEndpoint mock = getMockEndpoint("mock:endpoint");
+        mock.expectedMessageCount(1);
+
+        context.start();
+
+        template.sendBodyAndHeader("direct:num", 1, "myheader", 2);
+        mock.assertIsSatisfied();
+
+        Exchange exchange = mock.getExchanges().get(0);
+        assertEquals("HelloHeader 2", exchange.getIn().getBody());
+    }
+
+    public Publisher<String> processBody(Publisher<Integer> data) {
+        return Flowable.fromPublisher(data)
+                .map(l -> "HelloBody " + l);
+    }
+
+    public Publisher<String> processBodyWrongType(Publisher<BeanCallTest> data) {
+        return Flowable.fromPublisher(data)
+                .map(l -> "HelloBody " + l);
+    }
+
+    public Publisher<String> processHeader(@Header("myheader") Publisher<Integer> data) {
+        return Flowable.fromPublisher(data)
+                .map(l -> "HelloHeader " + l);
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+}