You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/02/01 15:48:33 UTC
camel git commit: CAMEL-10612: adding Publisher types for bean call
Repository: camel
Updated Branches:
refs/heads/master e16315825 -> 267f065fb
CAMEL-10612: adding Publisher types for bean call
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/267f065f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/267f065f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/267f065f
Branch: refs/heads/master
Commit: 267f065fb9640a95164404795a3094d978e313cd
Parents: e163158
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Wed Feb 1 16:47:56 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Wed Feb 1 16:48:09 2017 +0100
----------------------------------------------------------------------
.../streams/ReactiveStreamsConverter.java | 62 ++++++++
.../reactive/streams/util/MonoPublisher.java | 64 ++++++++
.../streams/util/UnwrappingStreamProcessor.java | 78 ++++++++++
.../services/org/apache/camel/TypeConverter | 18 +++
.../reactive/streams/BeanCallTest.java | 148 +++++++++++++++++++
5 files changed, 370 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
new file mode 100644
index 0000000..23f49c9
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.FallbackConverter;
+import org.apache.camel.Processor;
+import org.apache.camel.component.bean.BeanInfo;
+import org.apache.camel.component.bean.BeanProcessor;
+import org.apache.camel.component.bean.ConstantBeanHolder;
+import org.apache.camel.component.reactive.streams.util.MonoPublisher;
+import org.apache.camel.component.reactive.streams.util.UnwrappingStreamProcessor;
+import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.reactivestreams.Publisher;
+
+
+@Converter
+public final class ReactiveStreamsConverter implements CamelContextAware {
+
+ private CamelContext camelContext;
+
+ public ReactiveStreamsConverter() {
+ }
+
+ @FallbackConverter
+ public Object convertToPublisher(Class<?> type, Exchange exchange, Object value, TypeConverterRegistry registry) {
+ if (type.isAssignableFrom(Publisher.class)) {
+ return new MonoPublisher<>(value);
+ }
+ return null;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
new file mode 100644
index 0000000..d0be491
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.util;
+
+import java.util.Objects;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * A publisher that publish a given item, then completes.
+ */
+public class MonoPublisher<T> implements Publisher<T> {
+
+ private T item;
+
+ public MonoPublisher(T item) {
+ this.item = item;
+ }
+
+ @Override
+ public void subscribe(Subscriber<? super T> subscriber) {
+ Objects.requireNonNull(subscriber, "subscriber must not be null");
+ subscriber.onSubscribe(new Subscription() {
+
+ private boolean terminated;
+
+ @Override
+ public void request(long l) {
+ if (terminated) {
+ throw new IllegalStateException("The subscription is terminated");
+ }
+
+ if (l <= 0) {
+ subscriber.onError(new IllegalArgumentException("3.9"));
+ } else {
+ subscriber.onNext(item);
+ subscriber.onComplete();
+ }
+ terminated = true;
+ }
+
+ @Override
+ public void cancel() {
+ terminated = true;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
new file mode 100644
index 0000000..fa85fd2
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.util;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * A Processor that converts a Publisher into its content asynchronously.
+ */
+public class UnwrappingStreamProcessor implements AsyncProcessor {
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ Object content = exchange.getIn().getBody();
+ if (content instanceof Publisher) {
+ Publisher<?> pub = Publisher.class.cast(content);
+
+ List<Object> data = new LinkedList<>();
+
+ pub.subscribe(new Subscriber<Object>() {
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(Object o) {
+ data.add(o);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ exchange.getIn().setBody(data);
+ exchange.setException(throwable);
+ callback.done(false);
+ }
+
+ @Override
+ public void onComplete() {
+ exchange.getIn().setBody(data);
+ callback.done(false);
+ }
+
+ });
+ }
+ return false;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
new file mode 100644
index 0000000..1167eeb
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.camel.component.reactive.streams.ReactiveStreamsConverter
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
new file mode 100644
index 0000000..1f05a00
--- /dev/null
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import io.reactivex.Flowable;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Header;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.reactive.streams.util.UnwrappingStreamProcessor;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+
+/**
+ *
+ */
+public class BeanCallTest extends CamelTestSupport {
+
+ @Test
+ public void beanCallTest() throws Exception {
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ onException(Throwable.class).to("direct:handle").handled(true);
+
+ from("direct:num")
+ .bean(BeanCallTest.this, "processBody")
+ .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+ .to("mock:endpoint");
+
+ from("direct:handle")
+ .setBody().constant("ERR")
+ .to("mock:endpoint");
+
+ }
+ }.addRoutesToCamelContext(context);
+
+ MockEndpoint mock = getMockEndpoint("mock:endpoint");
+ mock.expectedMessageCount(1);
+
+ context.start();
+
+ template.sendBody("direct:num", 1);
+ mock.assertIsSatisfied();
+
+ Exchange exchange = mock.getExchanges().get(0);
+ assertEquals("HelloBody 1", exchange.getIn().getBody());
+ }
+
+ @Test
+ public void beanCallWithErrorTest() throws Exception {
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ onException(Throwable.class).to("direct:handle").handled(true);
+
+ from("direct:num")
+ .bean(BeanCallTest.this, "processBodyWrongType")
+ .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+ .to("mock:endpoint");
+
+ from("direct:handle")
+ .setBody().constant("ERR")
+ .to("mock:endpoint");
+ }
+ }.addRoutesToCamelContext(context);
+
+ MockEndpoint mock = getMockEndpoint("mock:endpoint");
+ mock.expectedMessageCount(1);
+
+ context.start();
+
+ template.sendBody("direct:num", 1);
+ mock.assertIsSatisfied();
+
+ Exchange exchange = mock.getExchanges().get(0);
+ assertEquals("ERR", exchange.getIn().getBody());
+ }
+
+ @Test
+ public void beanCallHeaderMappingTest() throws Exception {
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ onException(Throwable.class).to("direct:handle").handled(true);
+
+ from("direct:num")
+ .bean(BeanCallTest.this, "processHeader")
+ .process(new UnwrappingStreamProcessor()).split().body() // Can be removed?
+ .to("mock:endpoint");
+
+ from("direct:handle")
+ .setBody().constant("ERR")
+ .to("mock:endpoint");
+ }
+ }.addRoutesToCamelContext(context);
+
+ MockEndpoint mock = getMockEndpoint("mock:endpoint");
+ mock.expectedMessageCount(1);
+
+ context.start();
+
+ template.sendBodyAndHeader("direct:num", 1, "myheader", 2);
+ mock.assertIsSatisfied();
+
+ Exchange exchange = mock.getExchanges().get(0);
+ assertEquals("HelloHeader 2", exchange.getIn().getBody());
+ }
+
+ public Publisher<String> processBody(Publisher<Integer> data) {
+ return Flowable.fromPublisher(data)
+ .map(l -> "HelloBody " + l);
+ }
+
+ public Publisher<String> processBodyWrongType(Publisher<BeanCallTest> data) {
+ return Flowable.fromPublisher(data)
+ .map(l -> "HelloBody " + l);
+ }
+
+ public Publisher<String> processHeader(@Header("myheader") Publisher<Integer> data) {
+ return Flowable.fromPublisher(data)
+ .map(l -> "HelloHeader " + l);
+ }
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+}