You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/09 08:26:27 UTC

camel git commit: CAMEL-11123: Rename Impl to Default which is the naming convention we typically use in camel.

Repository: camel
Updated Branches:
  refs/heads/master cf6a7414b -> 96bbc91cf


CAMEL-11123: Rename Impl to Default which is the naming convention we typically use in camel.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/96bbc91c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/96bbc91c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/96bbc91c

Branch: refs/heads/master
Commit: 96bbc91cfc1abac48a659493354036cb6ccc01b2
Parents: cf6a741
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 9 10:25:44 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 9 10:25:44 2017 +0200

----------------------------------------------------------------------
 .../engine/CamelReactiveStreamsServiceImpl.java | 418 -------------------
 .../DefaultCamelReactiveStreamsService.java     | 418 +++++++++++++++++++
 .../camel/reactive-streams/default-service      |   2 +-
 .../streams/CamelReactiveStreamsTest.java       |   6 +-
 4 files changed, 422 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
deleted file mode 100644
index 3d23ce7..0000000
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.reactive.streams.engine;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Function;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.api.management.ManagedOperation;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
-import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
-import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
-import org.apache.camel.component.reactive.streams.api.DispatchCallback;
-import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
-import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
-import org.apache.camel.component.reactive.streams.util.MonoPublisher;
-import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.spi.Synchronization;
-import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.ObjectHelper;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-
-/**
- * The default implementation of the reactive streams service.
- */
-@ManagedResource(description = "Managed CamelReactiveStreamsService")
-public class CamelReactiveStreamsServiceImpl extends ServiceSupport implements CamelReactiveStreamsService {
-
-    private CamelContext context;
-
-    private ExecutorService workerPool;
-
-    private final Map<String, CamelPublisher> publishers = new ConcurrentHashMap<>();
-
-    private final Map<String, CamelSubscriber> subscribers = new ConcurrentHashMap<>();
-
-    private final Map<String, String> publishedUriToStream = new ConcurrentHashMap<>();
-
-    private final Map<String, String> requestedUriToStream = new ConcurrentHashMap<>();
-
-    public CamelReactiveStreamsServiceImpl() {
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        ReactiveStreamsComponent component = context.getComponent("reactive-streams", ReactiveStreamsComponent.class);
-        ReactiveStreamsEngineConfiguration config = component.getInternalEngineConfiguration();
-        this.workerPool = context.getExecutorServiceManager().newThreadPool(this, config.getThreadPoolName(), config.getThreadPoolMinSize(), config.getThreadPoolMaxSize());
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (this.workerPool != null) {
-            context.getExecutorServiceManager().shutdownNow(this.workerPool);
-            this.workerPool = null;
-        }
-    }
-
-    @Override
-    public Publisher<Exchange> fromStream(String name) {
-        return new UnwrappingPublisher<>(getPayloadPublisher(name));
-    }
-
-    @SuppressWarnings("unchecked")
-    public <T> Publisher<T> fromStream(String name, Class<T> cls) {
-        if (Exchange.class.equals(cls)) {
-            return (Publisher<T>) fromStream(name);
-        }
-
-        return new ConvertingPublisher<T>(fromStream(name), cls);
-    }
-
-    @Override
-    public CamelSubscriber streamSubscriber(String name) {
-        subscribers.computeIfAbsent(name, n -> new CamelSubscriber(name));
-        return subscribers.get(name);
-    }
-
-    @SuppressWarnings("unchecked")
-    public <T> Subscriber<T> streamSubscriber(String name, Class<T> type) {
-        if (Exchange.class.equals(type)) {
-            return (Subscriber<T>) streamSubscriber(name);
-        }
-
-        return new ConvertingSubscriber<T>(streamSubscriber(name), getCamelContext());
-    }
-
-    @Override
-    public void sendCamelExchange(String name, Exchange exchange, DispatchCallback<Exchange> callback) {
-        StreamPayload<Exchange> payload = new StreamPayload<>(exchange, callback);
-        getPayloadPublisher(name).publish(payload);
-    }
-
-    @Override
-    public Publisher<Exchange> toStream(String name, Object data) {
-        Exchange exchange = convertToExchange(data);
-        return doRequest(name, exchange);
-    }
-
-    @Override
-    public Function<?, ? extends Publisher<Exchange>> toStream(String name) {
-        return data -> toStream(name, data);
-    }
-
-    @Override
-    public <T> Publisher<T> toStream(String name, Object data, Class<T> type) {
-        return new ConvertingPublisher<>(toStream(name, data), type);
-    }
-
-    protected Publisher<Exchange> doRequest(String name, Exchange data) {
-        ReactiveStreamsConsumer consumer = streamSubscriber(name).getConsumer();
-        if (consumer == null) {
-            throw new IllegalStateException("No consumers attached to the stream " + name);
-        }
-
-        DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool);
-
-        data.addOnCompletion(new Synchronization() {
-            @Override
-            public void onComplete(Exchange exchange) {
-                publisher.setData(exchange);
-            }
-
-            @Override
-            public void onFailure(Exchange exchange) {
-                Throwable throwable = exchange.getException();
-                if (throwable == null) {
-                    throwable = new IllegalStateException("Unknown Exception");
-                }
-                publisher.setException(throwable);
-            }
-        });
-
-        consumer.process(data, doneSync -> {
-        });
-
-        return publisher;
-    }
-
-    @Override
-    public <T> Function<Object, Publisher<T>> toStream(String name, Class<T> type) {
-        return data -> toStream(name, data, type);
-    }
-
-    private CamelPublisher getPayloadPublisher(String name) {
-        publishers.computeIfAbsent(name, n -> new CamelPublisher(this.workerPool, this.context, n));
-        return publishers.get(name);
-    }
-
-    @Override
-    public Publisher<Exchange> from(String uri) {
-        publishedUriToStream.computeIfAbsent(uri, u -> {
-            try {
-                String uuid = context.getUuidGenerator().generateUuid();
-                new RouteBuilder() {
-                    @Override
-                    public void configure() throws Exception {
-                        from(u)
-                                .to("reactive-streams:" + uuid);
-                    }
-                }.addRoutesToCamelContext(context);
-
-                return uuid;
-            } catch (Exception e) {
-                throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + uri, e);
-            }
-        });
-        return fromStream(publishedUriToStream.get(uri));
-    }
-
-    @Override
-    public <T> Publisher<T> from(String uri, Class<T> type) {
-        return new ConvertingPublisher<T>(from(uri), type);
-    }
-
-    @Override
-    public Subscriber<Exchange> subscriber(String uri) {
-        try {
-            String uuid = context.getUuidGenerator().generateUuid();
-            new RouteBuilder() {
-                @Override
-                public void configure() throws Exception {
-                    from("reactive-streams:" + uuid)
-                            .to(uri);
-                }
-            }.addRoutesToCamelContext(context);
-
-            return streamSubscriber(uuid);
-        } catch (Exception e) {
-            throw new IllegalStateException("Unable to create source reactive stream towards direct URI: " + uri, e);
-        }
-    }
-
-    @Override
-    public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
-        return new ConvertingSubscriber<T>(subscriber(uri), context);
-    }
-
-    @Override
-    public Publisher<Exchange> to(String uri, Object data) {
-        requestedUriToStream.computeIfAbsent(uri, u -> {
-            try {
-                String uuid = context.getUuidGenerator().generateUuid();
-                new RouteBuilder() {
-                    @Override
-                    public void configure() throws Exception {
-                        from("reactive-streams:" + uuid)
-                                .to(u);
-                    }
-                }.addRoutesToCamelContext(context);
-
-                return uuid;
-            } catch (Exception e) {
-                throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + uri, e);
-            }
-        });
-        return toStream(requestedUriToStream.get(uri), data);
-    }
-
-    @Override
-    public Function<Object, Publisher<Exchange>> to(String uri) {
-        return data -> to(uri, data);
-    }
-
-    @Override
-    public <T> Publisher<T> to(String uri, Object data, Class<T> type) {
-        return new ConvertingPublisher<T>(to(uri, data), type);
-    }
-
-    @Override
-    public <T> Function<Object, Publisher<T>> to(String uri, Class<T> type) {
-        return data -> to(uri, data, type);
-    }
-
-    @Override
-    public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) {
-        try {
-            new RouteBuilder() {
-                @Override
-                public void configure() throws Exception {
-                    from(uri)
-                            .process(exchange -> {
-                                Exchange copy = exchange.copy();
-                                Object result = processor.apply(new MonoPublisher<>(copy));
-                                exchange.getIn().setBody(result);
-                            })
-                            .process(new UnwrapStreamProcessor());
-                }
-            }.addRoutesToCamelContext(context);
-        } catch (Exception e) {
-            throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + uri, e);
-        }
-    }
-
-    @Override
-    public <T> void process(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
-        process(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type)));
-    }
-
-    @Override
-    public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
-        CamelSubscriber subscriber = streamSubscriber(name);
-        subscriber.attachConsumer(consumer);
-        return subscriber;
-    }
-
-    @Override
-    public void detachCamelConsumer(String name) {
-        streamSubscriber(name).detachConsumer();
-    }
-
-    @Override
-    public void attachCamelProducer(String name, ReactiveStreamsProducer producer) {
-        getPayloadPublisher(name).attachProducer(producer);
-    }
-
-    @Override
-    public void detachCamelProducer(String name) {
-        getPayloadPublisher(name).detachProducer();
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.context = camelContext;
-    }
-
-    @Override
-    public CamelContext getCamelContext() {
-        return this.context;
-    }
-
-    private Exchange convertToExchange(Object data) {
-        Exchange exchange;
-        if (data instanceof Exchange) {
-            exchange = (Exchange) data;
-        } else {
-            exchange = new DefaultExchange(context);
-            exchange.setPattern(ExchangePattern.InOut);
-            exchange.getIn().setBody(data);
-        }
-
-        return exchange;
-    }
-
-    @ManagedOperation(description = "Information about Camel Reactive subscribers")
-    public TabularData camelSubscribers() {
-        try {
-            final TabularData answer = new TabularDataSupport(subscriptionsTabularType());
-
-            subscribers.forEach((k, v) -> {
-                try {
-                    String name = k;
-                    long inflight = v.getInflightCount();
-                    long requested = v.getRequested();
-                    long bufferSize = v.getBufferSize();
-                    String backpressure = v.getBackpressureStrategy() != null ? v.getBackpressureStrategy().name() : "";
-
-                    CompositeType ct = subscriptionsCompositeType();
-                    CompositeData data = new CompositeDataSupport(ct,
-                        new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
-                        new Object[] {name, inflight, requested, bufferSize, backpressure});
-                    answer.put(data);
-                } catch (Exception e) {
-                    throw ObjectHelper.wrapRuntimeCamelException(e);
-                }
-            });
-
-            return answer;
-        } catch (Exception e) {
-            throw ObjectHelper.wrapRuntimeCamelException(e);
-        }
-    }
-
-    @ManagedOperation(description = "Information about Camel Reactive publishers")
-    public TabularData camelPublishers() {
-        try {
-            final TabularData answer = new TabularDataSupport(publishersTabularType());
-
-            publishers.forEach((k, v) -> {
-                try {
-                    String name = k;
-                    int subscribers = v.getSubscriptionSize();
-
-                    // TODO: include more subscriber information, either as a nested table or flattern
-
-                    CompositeType ct = publishersCompositeType();
-                    CompositeData data = new CompositeDataSupport(ct,
-                        new String[] {"name", "subscribers"},
-                        new Object[] {name, subscribers});
-                    answer.put(data);
-                } catch (Exception e) {
-                    throw ObjectHelper.wrapRuntimeCamelException(e);
-                }
-            });
-
-            return answer;
-        } catch (Exception e) {
-            throw ObjectHelper.wrapRuntimeCamelException(e);
-        }
-    }
-
-    private static CompositeType subscriptionsCompositeType() throws OpenDataException {
-        return new CompositeType("subscriptions", "Subscriptions",
-            new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
-            new String[] {"Name", "Inflight", "Requested", "Buffer Size", "Back Pressure"},
-            new OpenType[] {SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING});
-    }
-
-    private static TabularType subscriptionsTabularType() throws OpenDataException {
-        CompositeType ct = subscriptionsCompositeType();
-        return new TabularType("subscriptions", "Information about Camel Reactive subscribers", ct, new String[]{"name"});
-    }
-
-    private static CompositeType publishersCompositeType() throws OpenDataException {
-        return new CompositeType("publishers", "Publishers",
-            new String[] {"name", "subscribers"},
-            new String[] {"Name", "Subscribers"},
-            new OpenType[] {SimpleType.STRING, SimpleType.INTEGER});
-    }
-
-    private static TabularType publishersTabularType() throws OpenDataException {
-        CompositeType ct = publishersCompositeType();
-        return new TabularType("publishers", "Information about Camel Reactive publishers", ct, new String[]{"name"});
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
new file mode 100644
index 0000000..3d8f968
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.engine;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.api.DispatchCallback;
+import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
+import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
+import org.apache.camel.component.reactive.streams.util.MonoPublisher;
+import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+/**
+ * The default implementation of the reactive streams service.
+ */
+@ManagedResource(description = "Managed CamelReactiveStreamsService")
+public class DefaultCamelReactiveStreamsService extends ServiceSupport implements CamelReactiveStreamsService {
+
+    private CamelContext context;
+
+    private ExecutorService workerPool;
+
+    private final Map<String, CamelPublisher> publishers = new ConcurrentHashMap<>();
+
+    private final Map<String, CamelSubscriber> subscribers = new ConcurrentHashMap<>();
+
+    private final Map<String, String> publishedUriToStream = new ConcurrentHashMap<>();
+
+    private final Map<String, String> requestedUriToStream = new ConcurrentHashMap<>();
+
+    public DefaultCamelReactiveStreamsService() {
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ReactiveStreamsComponent component = context.getComponent("reactive-streams", ReactiveStreamsComponent.class);
+        ReactiveStreamsEngineConfiguration config = component.getInternalEngineConfiguration();
+        this.workerPool = context.getExecutorServiceManager().newThreadPool(this, config.getThreadPoolName(), config.getThreadPoolMinSize(), config.getThreadPoolMaxSize());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (this.workerPool != null) {
+            context.getExecutorServiceManager().shutdownNow(this.workerPool);
+            this.workerPool = null;
+        }
+    }
+
+    @Override
+    public Publisher<Exchange> fromStream(String name) {
+        return new UnwrappingPublisher<>(getPayloadPublisher(name));
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> Publisher<T> fromStream(String name, Class<T> cls) {
+        if (Exchange.class.equals(cls)) {
+            return (Publisher<T>) fromStream(name);
+        }
+
+        return new ConvertingPublisher<T>(fromStream(name), cls);
+    }
+
+    @Override
+    public CamelSubscriber streamSubscriber(String name) {
+        subscribers.computeIfAbsent(name, n -> new CamelSubscriber(name));
+        return subscribers.get(name);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> Subscriber<T> streamSubscriber(String name, Class<T> type) {
+        if (Exchange.class.equals(type)) {
+            return (Subscriber<T>) streamSubscriber(name);
+        }
+
+        return new ConvertingSubscriber<T>(streamSubscriber(name), getCamelContext());
+    }
+
+    @Override
+    public void sendCamelExchange(String name, Exchange exchange, DispatchCallback<Exchange> callback) {
+        StreamPayload<Exchange> payload = new StreamPayload<>(exchange, callback);
+        getPayloadPublisher(name).publish(payload);
+    }
+
+    @Override
+    public Publisher<Exchange> toStream(String name, Object data) {
+        Exchange exchange = convertToExchange(data);
+        return doRequest(name, exchange);
+    }
+
+    @Override
+    public Function<?, ? extends Publisher<Exchange>> toStream(String name) {
+        return data -> toStream(name, data);
+    }
+
+    @Override
+    public <T> Publisher<T> toStream(String name, Object data, Class<T> type) {
+        return new ConvertingPublisher<>(toStream(name, data), type);
+    }
+
+    protected Publisher<Exchange> doRequest(String name, Exchange data) {
+        ReactiveStreamsConsumer consumer = streamSubscriber(name).getConsumer();
+        if (consumer == null) {
+            throw new IllegalStateException("No consumers attached to the stream " + name);
+        }
+
+        DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool);
+
+        data.addOnCompletion(new Synchronization() {
+            @Override
+            public void onComplete(Exchange exchange) {
+                publisher.setData(exchange);
+            }
+
+            @Override
+            public void onFailure(Exchange exchange) {
+                Throwable throwable = exchange.getException();
+                if (throwable == null) {
+                    throwable = new IllegalStateException("Unknown Exception");
+                }
+                publisher.setException(throwable);
+            }
+        });
+
+        consumer.process(data, doneSync -> {
+        });
+
+        return publisher;
+    }
+
+    @Override
+    public <T> Function<Object, Publisher<T>> toStream(String name, Class<T> type) {
+        return data -> toStream(name, data, type);
+    }
+
+    private CamelPublisher getPayloadPublisher(String name) {
+        publishers.computeIfAbsent(name, n -> new CamelPublisher(this.workerPool, this.context, n));
+        return publishers.get(name);
+    }
+
+    @Override
+    public Publisher<Exchange> from(String uri) {
+        publishedUriToStream.computeIfAbsent(uri, u -> {
+            try {
+                String uuid = context.getUuidGenerator().generateUuid();
+                new RouteBuilder() {
+                    @Override
+                    public void configure() throws Exception {
+                        from(u)
+                                .to("reactive-streams:" + uuid);
+                    }
+                }.addRoutesToCamelContext(context);
+
+                return uuid;
+            } catch (Exception e) {
+                throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + uri, e);
+            }
+        });
+        return fromStream(publishedUriToStream.get(uri));
+    }
+
+    @Override
+    public <T> Publisher<T> from(String uri, Class<T> type) {
+        return new ConvertingPublisher<T>(from(uri), type);
+    }
+
+    @Override
+    public Subscriber<Exchange> subscriber(String uri) {
+        try {
+            String uuid = context.getUuidGenerator().generateUuid();
+            new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("reactive-streams:" + uuid)
+                            .to(uri);
+                }
+            }.addRoutesToCamelContext(context);
+
+            return streamSubscriber(uuid);
+        } catch (Exception e) {
+            throw new IllegalStateException("Unable to create source reactive stream towards direct URI: " + uri, e);
+        }
+    }
+
+    @Override
+    public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
+        return new ConvertingSubscriber<T>(subscriber(uri), context);
+    }
+
+    @Override
+    public Publisher<Exchange> to(String uri, Object data) {
+        requestedUriToStream.computeIfAbsent(uri, u -> {
+            try {
+                String uuid = context.getUuidGenerator().generateUuid();
+                new RouteBuilder() {
+                    @Override
+                    public void configure() throws Exception {
+                        from("reactive-streams:" + uuid)
+                                .to(u);
+                    }
+                }.addRoutesToCamelContext(context);
+
+                return uuid;
+            } catch (Exception e) {
+                throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + uri, e);
+            }
+        });
+        return toStream(requestedUriToStream.get(uri), data);
+    }
+
+    @Override
+    public Function<Object, Publisher<Exchange>> to(String uri) {
+        return data -> to(uri, data);
+    }
+
+    @Override
+    public <T> Publisher<T> to(String uri, Object data, Class<T> type) {
+        return new ConvertingPublisher<T>(to(uri, data), type);
+    }
+
+    @Override
+    public <T> Function<Object, Publisher<T>> to(String uri, Class<T> type) {
+        return data -> to(uri, data, type);
+    }
+
+    @Override
+    public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) {
+        try {
+            new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from(uri)
+                            .process(exchange -> {
+                                Exchange copy = exchange.copy();
+                                Object result = processor.apply(new MonoPublisher<>(copy));
+                                exchange.getIn().setBody(result);
+                            })
+                            .process(new UnwrapStreamProcessor());
+                }
+            }.addRoutesToCamelContext(context);
+        } catch (Exception e) {
+            throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + uri, e);
+        }
+    }
+
+    @Override
+    public <T> void process(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
+        process(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type)));
+    }
+
+    @Override
+    public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
+        CamelSubscriber subscriber = streamSubscriber(name);
+        subscriber.attachConsumer(consumer);
+        return subscriber;
+    }
+
+    @Override
+    public void detachCamelConsumer(String name) {
+        streamSubscriber(name).detachConsumer();
+    }
+
+    @Override
+    public void attachCamelProducer(String name, ReactiveStreamsProducer producer) {
+        getPayloadPublisher(name).attachProducer(producer);
+    }
+
+    @Override
+    public void detachCamelProducer(String name) {
+        getPayloadPublisher(name).detachProducer();
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.context = camelContext;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return this.context;
+    }
+
+    private Exchange convertToExchange(Object data) {
+        Exchange exchange;
+        if (data instanceof Exchange) {
+            exchange = (Exchange) data;
+        } else {
+            exchange = new DefaultExchange(context);
+            exchange.setPattern(ExchangePattern.InOut);
+            exchange.getIn().setBody(data);
+        }
+
+        return exchange;
+    }
+
+    @ManagedOperation(description = "Information about Camel Reactive subscribers")
+    public TabularData camelSubscribers() {
+        try {
+            final TabularData answer = new TabularDataSupport(subscriptionsTabularType());
+
+            subscribers.forEach((k, v) -> {
+                try {
+                    String name = k;
+                    long inflight = v.getInflightCount();
+                    long requested = v.getRequested();
+                    long bufferSize = v.getBufferSize();
+                    String backpressure = v.getBackpressureStrategy() != null ? v.getBackpressureStrategy().name() : "";
+
+                    CompositeType ct = subscriptionsCompositeType();
+                    CompositeData data = new CompositeDataSupport(ct,
+                        new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
+                        new Object[] {name, inflight, requested, bufferSize, backpressure});
+                    answer.put(data);
+                } catch (Exception e) {
+                    throw ObjectHelper.wrapRuntimeCamelException(e);
+                }
+            });
+
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    @ManagedOperation(description = "Information about Camel Reactive publishers")
+    public TabularData camelPublishers() {
+        try {
+            final TabularData answer = new TabularDataSupport(publishersTabularType());
+
+            publishers.forEach((k, v) -> {
+                try {
+                    String name = k;
+                    int subscribers = v.getSubscriptionSize();
+
+                    // TODO: include more subscriber information, either as a nested table or flattern
+
+                    CompositeType ct = publishersCompositeType();
+                    CompositeData data = new CompositeDataSupport(ct,
+                        new String[] {"name", "subscribers"},
+                        new Object[] {name, subscribers});
+                    answer.put(data);
+                } catch (Exception e) {
+                    throw ObjectHelper.wrapRuntimeCamelException(e);
+                }
+            });
+
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    private static CompositeType subscriptionsCompositeType() throws OpenDataException {
+        return new CompositeType("subscriptions", "Subscriptions",
+            new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
+            new String[] {"Name", "Inflight", "Requested", "Buffer Size", "Back Pressure"},
+            new OpenType[] {SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING});
+    }
+
+    private static TabularType subscriptionsTabularType() throws OpenDataException {
+        CompositeType ct = subscriptionsCompositeType();
+        return new TabularType("subscriptions", "Information about Camel Reactive subscribers", ct, new String[]{"name"});
+    }
+
+    private static CompositeType publishersCompositeType() throws OpenDataException {
+        return new CompositeType("publishers", "Publishers",
+            new String[] {"name", "subscribers"},
+            new String[] {"Name", "Subscribers"},
+            new OpenType[] {SimpleType.STRING, SimpleType.INTEGER});
+    }
+
+    private static TabularType publishersTabularType() throws OpenDataException {
+        CompositeType ct = publishersCompositeType();
+        return new TabularType("publishers", "Information about Camel Reactive publishers", ct, new String[]{"name"});
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service
index 2ce7448..008f4fc 100644
--- a/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service
+++ b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.component.reactive.streams.engine.CamelReactiveStreamsServiceImpl
+class=org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService

http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java
index 149e317..d633f52 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.reactive.streams;
 
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
-import org.apache.camel.component.reactive.streams.engine.CamelReactiveStreamsServiceImpl;
+import org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService;
 import org.apache.camel.component.reactive.streams.support.ReactiveStreamsTestService;
 import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -30,14 +30,14 @@ public class CamelReactiveStreamsTest extends CamelTestSupport {
     @Test
     public void testDefaultService() {
         CamelReactiveStreamsService service1 = CamelReactiveStreams.get(context, "default-service");
-        assertTrue(service1 instanceof CamelReactiveStreamsServiceImpl);
+        assertTrue(service1 instanceof DefaultCamelReactiveStreamsService);
     }
 
     @Test
     public void testSameDefaultServiceReturned() {
         CamelReactiveStreamsService service1 = CamelReactiveStreams.get(context, "default-service");
         CamelReactiveStreamsService service2 = CamelReactiveStreams.get(context, "default-service");
-        assertTrue(service1 instanceof CamelReactiveStreamsServiceImpl);
+        assertTrue(service1 instanceof DefaultCamelReactiveStreamsService);
         assertEquals(service1, service2);
     }