You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/09 08:26:27 UTC
camel git commit: CAMEL-11123: Rename Impl to Default which is the
naming convention we typically use in camel.
Repository: camel
Updated Branches:
refs/heads/master cf6a7414b -> 96bbc91cf
CAMEL-11123: Rename Impl to Default which is the naming convention we typically use in camel.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/96bbc91c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/96bbc91c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/96bbc91c
Branch: refs/heads/master
Commit: 96bbc91cfc1abac48a659493354036cb6ccc01b2
Parents: cf6a741
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 9 10:25:44 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 9 10:25:44 2017 +0200
----------------------------------------------------------------------
.../engine/CamelReactiveStreamsServiceImpl.java | 418 -------------------
.../DefaultCamelReactiveStreamsService.java | 418 +++++++++++++++++++
.../camel/reactive-streams/default-service | 2 +-
.../streams/CamelReactiveStreamsTest.java | 6 +-
4 files changed, 422 insertions(+), 422 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
deleted file mode 100644
index 3d23ce7..0000000
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.reactive.streams.engine;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Function;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.api.management.ManagedOperation;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
-import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
-import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
-import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
-import org.apache.camel.component.reactive.streams.api.DispatchCallback;
-import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
-import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
-import org.apache.camel.component.reactive.streams.util.MonoPublisher;
-import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.spi.Synchronization;
-import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.ObjectHelper;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-
-/**
- * The default implementation of the reactive streams service.
- */
-@ManagedResource(description = "Managed CamelReactiveStreamsService")
-public class CamelReactiveStreamsServiceImpl extends ServiceSupport implements CamelReactiveStreamsService {
-
- private CamelContext context;
-
- private ExecutorService workerPool;
-
- private final Map<String, CamelPublisher> publishers = new ConcurrentHashMap<>();
-
- private final Map<String, CamelSubscriber> subscribers = new ConcurrentHashMap<>();
-
- private final Map<String, String> publishedUriToStream = new ConcurrentHashMap<>();
-
- private final Map<String, String> requestedUriToStream = new ConcurrentHashMap<>();
-
- public CamelReactiveStreamsServiceImpl() {
- }
-
- @Override
- protected void doStart() throws Exception {
- ReactiveStreamsComponent component = context.getComponent("reactive-streams", ReactiveStreamsComponent.class);
- ReactiveStreamsEngineConfiguration config = component.getInternalEngineConfiguration();
- this.workerPool = context.getExecutorServiceManager().newThreadPool(this, config.getThreadPoolName(), config.getThreadPoolMinSize(), config.getThreadPoolMaxSize());
- }
-
- @Override
- protected void doStop() throws Exception {
- if (this.workerPool != null) {
- context.getExecutorServiceManager().shutdownNow(this.workerPool);
- this.workerPool = null;
- }
- }
-
- @Override
- public Publisher<Exchange> fromStream(String name) {
- return new UnwrappingPublisher<>(getPayloadPublisher(name));
- }
-
- @SuppressWarnings("unchecked")
- public <T> Publisher<T> fromStream(String name, Class<T> cls) {
- if (Exchange.class.equals(cls)) {
- return (Publisher<T>) fromStream(name);
- }
-
- return new ConvertingPublisher<T>(fromStream(name), cls);
- }
-
- @Override
- public CamelSubscriber streamSubscriber(String name) {
- subscribers.computeIfAbsent(name, n -> new CamelSubscriber(name));
- return subscribers.get(name);
- }
-
- @SuppressWarnings("unchecked")
- public <T> Subscriber<T> streamSubscriber(String name, Class<T> type) {
- if (Exchange.class.equals(type)) {
- return (Subscriber<T>) streamSubscriber(name);
- }
-
- return new ConvertingSubscriber<T>(streamSubscriber(name), getCamelContext());
- }
-
- @Override
- public void sendCamelExchange(String name, Exchange exchange, DispatchCallback<Exchange> callback) {
- StreamPayload<Exchange> payload = new StreamPayload<>(exchange, callback);
- getPayloadPublisher(name).publish(payload);
- }
-
- @Override
- public Publisher<Exchange> toStream(String name, Object data) {
- Exchange exchange = convertToExchange(data);
- return doRequest(name, exchange);
- }
-
- @Override
- public Function<?, ? extends Publisher<Exchange>> toStream(String name) {
- return data -> toStream(name, data);
- }
-
- @Override
- public <T> Publisher<T> toStream(String name, Object data, Class<T> type) {
- return new ConvertingPublisher<>(toStream(name, data), type);
- }
-
- protected Publisher<Exchange> doRequest(String name, Exchange data) {
- ReactiveStreamsConsumer consumer = streamSubscriber(name).getConsumer();
- if (consumer == null) {
- throw new IllegalStateException("No consumers attached to the stream " + name);
- }
-
- DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool);
-
- data.addOnCompletion(new Synchronization() {
- @Override
- public void onComplete(Exchange exchange) {
- publisher.setData(exchange);
- }
-
- @Override
- public void onFailure(Exchange exchange) {
- Throwable throwable = exchange.getException();
- if (throwable == null) {
- throwable = new IllegalStateException("Unknown Exception");
- }
- publisher.setException(throwable);
- }
- });
-
- consumer.process(data, doneSync -> {
- });
-
- return publisher;
- }
-
- @Override
- public <T> Function<Object, Publisher<T>> toStream(String name, Class<T> type) {
- return data -> toStream(name, data, type);
- }
-
- private CamelPublisher getPayloadPublisher(String name) {
- publishers.computeIfAbsent(name, n -> new CamelPublisher(this.workerPool, this.context, n));
- return publishers.get(name);
- }
-
- @Override
- public Publisher<Exchange> from(String uri) {
- publishedUriToStream.computeIfAbsent(uri, u -> {
- try {
- String uuid = context.getUuidGenerator().generateUuid();
- new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from(u)
- .to("reactive-streams:" + uuid);
- }
- }.addRoutesToCamelContext(context);
-
- return uuid;
- } catch (Exception e) {
- throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + uri, e);
- }
- });
- return fromStream(publishedUriToStream.get(uri));
- }
-
- @Override
- public <T> Publisher<T> from(String uri, Class<T> type) {
- return new ConvertingPublisher<T>(from(uri), type);
- }
-
- @Override
- public Subscriber<Exchange> subscriber(String uri) {
- try {
- String uuid = context.getUuidGenerator().generateUuid();
- new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("reactive-streams:" + uuid)
- .to(uri);
- }
- }.addRoutesToCamelContext(context);
-
- return streamSubscriber(uuid);
- } catch (Exception e) {
- throw new IllegalStateException("Unable to create source reactive stream towards direct URI: " + uri, e);
- }
- }
-
- @Override
- public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
- return new ConvertingSubscriber<T>(subscriber(uri), context);
- }
-
- @Override
- public Publisher<Exchange> to(String uri, Object data) {
- requestedUriToStream.computeIfAbsent(uri, u -> {
- try {
- String uuid = context.getUuidGenerator().generateUuid();
- new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("reactive-streams:" + uuid)
- .to(u);
- }
- }.addRoutesToCamelContext(context);
-
- return uuid;
- } catch (Exception e) {
- throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + uri, e);
- }
- });
- return toStream(requestedUriToStream.get(uri), data);
- }
-
- @Override
- public Function<Object, Publisher<Exchange>> to(String uri) {
- return data -> to(uri, data);
- }
-
- @Override
- public <T> Publisher<T> to(String uri, Object data, Class<T> type) {
- return new ConvertingPublisher<T>(to(uri, data), type);
- }
-
- @Override
- public <T> Function<Object, Publisher<T>> to(String uri, Class<T> type) {
- return data -> to(uri, data, type);
- }
-
- @Override
- public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) {
- try {
- new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from(uri)
- .process(exchange -> {
- Exchange copy = exchange.copy();
- Object result = processor.apply(new MonoPublisher<>(copy));
- exchange.getIn().setBody(result);
- })
- .process(new UnwrapStreamProcessor());
- }
- }.addRoutesToCamelContext(context);
- } catch (Exception e) {
- throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + uri, e);
- }
- }
-
- @Override
- public <T> void process(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
- process(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type)));
- }
-
- @Override
- public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
- CamelSubscriber subscriber = streamSubscriber(name);
- subscriber.attachConsumer(consumer);
- return subscriber;
- }
-
- @Override
- public void detachCamelConsumer(String name) {
- streamSubscriber(name).detachConsumer();
- }
-
- @Override
- public void attachCamelProducer(String name, ReactiveStreamsProducer producer) {
- getPayloadPublisher(name).attachProducer(producer);
- }
-
- @Override
- public void detachCamelProducer(String name) {
- getPayloadPublisher(name).detachProducer();
- }
-
- @Override
- public void setCamelContext(CamelContext camelContext) {
- this.context = camelContext;
- }
-
- @Override
- public CamelContext getCamelContext() {
- return this.context;
- }
-
- private Exchange convertToExchange(Object data) {
- Exchange exchange;
- if (data instanceof Exchange) {
- exchange = (Exchange) data;
- } else {
- exchange = new DefaultExchange(context);
- exchange.setPattern(ExchangePattern.InOut);
- exchange.getIn().setBody(data);
- }
-
- return exchange;
- }
-
- @ManagedOperation(description = "Information about Camel Reactive subscribers")
- public TabularData camelSubscribers() {
- try {
- final TabularData answer = new TabularDataSupport(subscriptionsTabularType());
-
- subscribers.forEach((k, v) -> {
- try {
- String name = k;
- long inflight = v.getInflightCount();
- long requested = v.getRequested();
- long bufferSize = v.getBufferSize();
- String backpressure = v.getBackpressureStrategy() != null ? v.getBackpressureStrategy().name() : "";
-
- CompositeType ct = subscriptionsCompositeType();
- CompositeData data = new CompositeDataSupport(ct,
- new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
- new Object[] {name, inflight, requested, bufferSize, backpressure});
- answer.put(data);
- } catch (Exception e) {
- throw ObjectHelper.wrapRuntimeCamelException(e);
- }
- });
-
- return answer;
- } catch (Exception e) {
- throw ObjectHelper.wrapRuntimeCamelException(e);
- }
- }
-
- @ManagedOperation(description = "Information about Camel Reactive publishers")
- public TabularData camelPublishers() {
- try {
- final TabularData answer = new TabularDataSupport(publishersTabularType());
-
- publishers.forEach((k, v) -> {
- try {
- String name = k;
- int subscribers = v.getSubscriptionSize();
-
- // TODO: include more subscriber information, either as a nested table or flattern
-
- CompositeType ct = publishersCompositeType();
- CompositeData data = new CompositeDataSupport(ct,
- new String[] {"name", "subscribers"},
- new Object[] {name, subscribers});
- answer.put(data);
- } catch (Exception e) {
- throw ObjectHelper.wrapRuntimeCamelException(e);
- }
- });
-
- return answer;
- } catch (Exception e) {
- throw ObjectHelper.wrapRuntimeCamelException(e);
- }
- }
-
- private static CompositeType subscriptionsCompositeType() throws OpenDataException {
- return new CompositeType("subscriptions", "Subscriptions",
- new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
- new String[] {"Name", "Inflight", "Requested", "Buffer Size", "Back Pressure"},
- new OpenType[] {SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING});
- }
-
- private static TabularType subscriptionsTabularType() throws OpenDataException {
- CompositeType ct = subscriptionsCompositeType();
- return new TabularType("subscriptions", "Information about Camel Reactive subscribers", ct, new String[]{"name"});
- }
-
- private static CompositeType publishersCompositeType() throws OpenDataException {
- return new CompositeType("publishers", "Publishers",
- new String[] {"name", "subscribers"},
- new String[] {"Name", "Subscribers"},
- new OpenType[] {SimpleType.STRING, SimpleType.INTEGER});
- }
-
- private static TabularType publishersTabularType() throws OpenDataException {
- CompositeType ct = publishersCompositeType();
- return new TabularType("publishers", "Information about Camel Reactive publishers", ct, new String[]{"name"});
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
new file mode 100644
index 0000000..3d8f968
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.engine;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.api.DispatchCallback;
+import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
+import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
+import org.apache.camel.component.reactive.streams.util.MonoPublisher;
+import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+
+/**
+ * The default implementation of the reactive streams service.
+ */
+@ManagedResource(description = "Managed CamelReactiveStreamsService")
+public class DefaultCamelReactiveStreamsService extends ServiceSupport implements CamelReactiveStreamsService {
+
+ private CamelContext context;
+
+ private ExecutorService workerPool;
+
+ private final Map<String, CamelPublisher> publishers = new ConcurrentHashMap<>();
+
+ private final Map<String, CamelSubscriber> subscribers = new ConcurrentHashMap<>();
+
+ private final Map<String, String> publishedUriToStream = new ConcurrentHashMap<>();
+
+ private final Map<String, String> requestedUriToStream = new ConcurrentHashMap<>();
+
+ public DefaultCamelReactiveStreamsService() {
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ReactiveStreamsComponent component = context.getComponent("reactive-streams", ReactiveStreamsComponent.class);
+ ReactiveStreamsEngineConfiguration config = component.getInternalEngineConfiguration();
+ this.workerPool = context.getExecutorServiceManager().newThreadPool(this, config.getThreadPoolName(), config.getThreadPoolMinSize(), config.getThreadPoolMaxSize());
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (this.workerPool != null) {
+ context.getExecutorServiceManager().shutdownNow(this.workerPool);
+ this.workerPool = null;
+ }
+ }
+
+ @Override
+ public Publisher<Exchange> fromStream(String name) {
+ return new UnwrappingPublisher<>(getPayloadPublisher(name));
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> Publisher<T> fromStream(String name, Class<T> cls) {
+ if (Exchange.class.equals(cls)) {
+ return (Publisher<T>) fromStream(name);
+ }
+
+ return new ConvertingPublisher<T>(fromStream(name), cls);
+ }
+
+ @Override
+ public CamelSubscriber streamSubscriber(String name) {
+ subscribers.computeIfAbsent(name, n -> new CamelSubscriber(name));
+ return subscribers.get(name);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> Subscriber<T> streamSubscriber(String name, Class<T> type) {
+ if (Exchange.class.equals(type)) {
+ return (Subscriber<T>) streamSubscriber(name);
+ }
+
+ return new ConvertingSubscriber<T>(streamSubscriber(name), getCamelContext());
+ }
+
+ @Override
+ public void sendCamelExchange(String name, Exchange exchange, DispatchCallback<Exchange> callback) {
+ StreamPayload<Exchange> payload = new StreamPayload<>(exchange, callback);
+ getPayloadPublisher(name).publish(payload);
+ }
+
+ @Override
+ public Publisher<Exchange> toStream(String name, Object data) {
+ Exchange exchange = convertToExchange(data);
+ return doRequest(name, exchange);
+ }
+
+ @Override
+ public Function<?, ? extends Publisher<Exchange>> toStream(String name) {
+ return data -> toStream(name, data);
+ }
+
+ @Override
+ public <T> Publisher<T> toStream(String name, Object data, Class<T> type) {
+ return new ConvertingPublisher<>(toStream(name, data), type);
+ }
+
+ protected Publisher<Exchange> doRequest(String name, Exchange data) {
+ ReactiveStreamsConsumer consumer = streamSubscriber(name).getConsumer();
+ if (consumer == null) {
+ throw new IllegalStateException("No consumers attached to the stream " + name);
+ }
+
+ DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool);
+
+ data.addOnCompletion(new Synchronization() {
+ @Override
+ public void onComplete(Exchange exchange) {
+ publisher.setData(exchange);
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ Throwable throwable = exchange.getException();
+ if (throwable == null) {
+ throwable = new IllegalStateException("Unknown Exception");
+ }
+ publisher.setException(throwable);
+ }
+ });
+
+ consumer.process(data, doneSync -> {
+ });
+
+ return publisher;
+ }
+
+ @Override
+ public <T> Function<Object, Publisher<T>> toStream(String name, Class<T> type) {
+ return data -> toStream(name, data, type);
+ }
+
+ private CamelPublisher getPayloadPublisher(String name) {
+ publishers.computeIfAbsent(name, n -> new CamelPublisher(this.workerPool, this.context, n));
+ return publishers.get(name);
+ }
+
+ @Override
+ public Publisher<Exchange> from(String uri) {
+ publishedUriToStream.computeIfAbsent(uri, u -> {
+ try {
+ String uuid = context.getUuidGenerator().generateUuid();
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(u)
+ .to("reactive-streams:" + uuid);
+ }
+ }.addRoutesToCamelContext(context);
+
+ return uuid;
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + uri, e);
+ }
+ });
+ return fromStream(publishedUriToStream.get(uri));
+ }
+
+ @Override
+ public <T> Publisher<T> from(String uri, Class<T> type) {
+ return new ConvertingPublisher<T>(from(uri), type);
+ }
+
+ @Override
+ public Subscriber<Exchange> subscriber(String uri) {
+ try {
+ String uuid = context.getUuidGenerator().generateUuid();
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("reactive-streams:" + uuid)
+ .to(uri);
+ }
+ }.addRoutesToCamelContext(context);
+
+ return streamSubscriber(uuid);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create source reactive stream towards direct URI: " + uri, e);
+ }
+ }
+
+ @Override
+ public <T> Subscriber<T> subscriber(String uri, Class<T> type) {
+ return new ConvertingSubscriber<T>(subscriber(uri), context);
+ }
+
+ @Override
+ public Publisher<Exchange> to(String uri, Object data) {
+ requestedUriToStream.computeIfAbsent(uri, u -> {
+ try {
+ String uuid = context.getUuidGenerator().generateUuid();
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("reactive-streams:" + uuid)
+ .to(u);
+ }
+ }.addRoutesToCamelContext(context);
+
+ return uuid;
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + uri, e);
+ }
+ });
+ return toStream(requestedUriToStream.get(uri), data);
+ }
+
+ @Override
+ public Function<Object, Publisher<Exchange>> to(String uri) {
+ return data -> to(uri, data);
+ }
+
+ @Override
+ public <T> Publisher<T> to(String uri, Object data, Class<T> type) {
+ return new ConvertingPublisher<T>(to(uri, data), type);
+ }
+
+ @Override
+ public <T> Function<Object, Publisher<T>> to(String uri, Class<T> type) {
+ return data -> to(uri, data, type);
+ }
+
+ @Override
+ public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) {
+ try {
+ new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(uri)
+ .process(exchange -> {
+ Exchange copy = exchange.copy();
+ Object result = processor.apply(new MonoPublisher<>(copy));
+ exchange.getIn().setBody(result);
+ })
+ .process(new UnwrapStreamProcessor());
+ }
+ }.addRoutesToCamelContext(context);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + uri, e);
+ }
+ }
+
+ @Override
+ public <T> void process(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) {
+ process(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type)));
+ }
+
+ @Override
+ public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) {
+ CamelSubscriber subscriber = streamSubscriber(name);
+ subscriber.attachConsumer(consumer);
+ return subscriber;
+ }
+
+ @Override
+ public void detachCamelConsumer(String name) {
+ streamSubscriber(name).detachConsumer();
+ }
+
+ @Override
+ public void attachCamelProducer(String name, ReactiveStreamsProducer producer) {
+ getPayloadPublisher(name).attachProducer(producer);
+ }
+
+ @Override
+ public void detachCamelProducer(String name) {
+ getPayloadPublisher(name).detachProducer();
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.context = camelContext;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return this.context;
+ }
+
+ private Exchange convertToExchange(Object data) {
+ Exchange exchange;
+ if (data instanceof Exchange) {
+ exchange = (Exchange) data;
+ } else {
+ exchange = new DefaultExchange(context);
+ exchange.setPattern(ExchangePattern.InOut);
+ exchange.getIn().setBody(data);
+ }
+
+ return exchange;
+ }
+
+ @ManagedOperation(description = "Information about Camel Reactive subscribers")
+ public TabularData camelSubscribers() {
+ try {
+ final TabularData answer = new TabularDataSupport(subscriptionsTabularType());
+
+ subscribers.forEach((k, v) -> {
+ try {
+ String name = k;
+ long inflight = v.getInflightCount();
+ long requested = v.getRequested();
+ long bufferSize = v.getBufferSize();
+ String backpressure = v.getBackpressureStrategy() != null ? v.getBackpressureStrategy().name() : "";
+
+ CompositeType ct = subscriptionsCompositeType();
+ CompositeData data = new CompositeDataSupport(ct,
+ new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
+ new Object[] {name, inflight, requested, bufferSize, backpressure});
+ answer.put(data);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ });
+
+ return answer;
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ @ManagedOperation(description = "Information about Camel Reactive publishers")
+ public TabularData camelPublishers() {
+ try {
+ final TabularData answer = new TabularDataSupport(publishersTabularType());
+
+ publishers.forEach((k, v) -> {
+ try {
+ String name = k;
+ int subscribers = v.getSubscriptionSize();
+
+ // TODO: include more subscriber information, either as a nested table or flattern
+
+ CompositeType ct = publishersCompositeType();
+ CompositeData data = new CompositeDataSupport(ct,
+ new String[] {"name", "subscribers"},
+ new Object[] {name, subscribers});
+ answer.put(data);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ });
+
+ return answer;
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ private static CompositeType subscriptionsCompositeType() throws OpenDataException {
+ return new CompositeType("subscriptions", "Subscriptions",
+ new String[] {"name", "inflight", "requested", "buffer size", "back pressure"},
+ new String[] {"Name", "Inflight", "Requested", "Buffer Size", "Back Pressure"},
+ new OpenType[] {SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING});
+ }
+
+ private static TabularType subscriptionsTabularType() throws OpenDataException {
+ CompositeType ct = subscriptionsCompositeType();
+ return new TabularType("subscriptions", "Information about Camel Reactive subscribers", ct, new String[]{"name"});
+ }
+
+ private static CompositeType publishersCompositeType() throws OpenDataException {
+ return new CompositeType("publishers", "Publishers",
+ new String[] {"name", "subscribers"},
+ new String[] {"Name", "Subscribers"},
+ new OpenType[] {SimpleType.STRING, SimpleType.INTEGER});
+ }
+
+ private static TabularType publishersTabularType() throws OpenDataException {
+ CompositeType ct = publishersCompositeType();
+ return new TabularType("publishers", "Information about Camel Reactive publishers", ct, new String[]{"name"});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service
index 2ce7448..008f4fc 100644
--- a/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service
+++ b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service
@@ -15,4 +15,4 @@
# limitations under the License.
#
-class=org.apache.camel.component.reactive.streams.engine.CamelReactiveStreamsServiceImpl
+class=org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService
http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java
index 149e317..d633f52 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.reactive.streams;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
-import org.apache.camel.component.reactive.streams.engine.CamelReactiveStreamsServiceImpl;
+import org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.support.ReactiveStreamsTestService;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
@@ -30,14 +30,14 @@ public class CamelReactiveStreamsTest extends CamelTestSupport {
@Test
public void testDefaultService() {
CamelReactiveStreamsService service1 = CamelReactiveStreams.get(context, "default-service");
- assertTrue(service1 instanceof CamelReactiveStreamsServiceImpl);
+ assertTrue(service1 instanceof DefaultCamelReactiveStreamsService);
}
@Test
public void testSameDefaultServiceReturned() {
CamelReactiveStreamsService service1 = CamelReactiveStreams.get(context, "default-service");
CamelReactiveStreamsService service2 = CamelReactiveStreams.get(context, "default-service");
- assertTrue(service1 instanceof CamelReactiveStreamsServiceImpl);
+ assertTrue(service1 instanceof DefaultCamelReactiveStreamsService);
assertEquals(service1, service2);
}