You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/01/27 14:35:02 UTC
[3/6] camel git commit: CAMEL-10650: adding lookup into registry for
named services
CAMEL-10650: adding lookup into registry for named services
CAMEL-10650: adding lookup into registry for named services
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9cbda1a1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9cbda1a1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9cbda1a1
Branch: refs/heads/master
Commit: 9cbda1a1042e0ce463fa4a2b57ca56369137b934
Parents: 40b42e6
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Jan 27 10:48:59 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Fri Jan 27 15:31:31 2017 +0100
----------------------------------------------------------------------
.../streams/ReactiveStreamsConsumer.java | 4 +-
.../streams/ReactiveStreamsProducer.java | 2 +-
.../streams/api/CamelReactiveStreams.java | 136 ++++---------------
.../api/CamelReactiveStreamsService.java | 13 +-
.../reactive/streams/api/DispatchCallback.java | 1 +
.../engine/CamelReactiveStreamsServiceImpl.java | 5 +
components/readme.adoc | 2 +-
7 files changed, 43 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index ca19e0a..73d807f 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -51,13 +51,13 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
}
- CamelReactiveStreams.get(endpoint.getCamelContext()).getService().attachConsumer(endpoint.getStream(), this);
+ CamelReactiveStreams.get(endpoint.getCamelContext()).attachConsumer(endpoint.getStream(), this);
}
@Override
protected void doStop() throws Exception {
super.doStop();
- CamelReactiveStreams.get(endpoint.getCamelContext()).getService().detachConsumer(endpoint.getStream());
+ CamelReactiveStreams.get(endpoint.getCamelContext()).detachConsumer(endpoint.getStream());
if (executor != null) {
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
index d74cdb1..3d90179 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
@@ -51,7 +51,7 @@ public class ReactiveStreamsProducer<T> extends DefaultAsyncProducer {
@Override
protected void doStart() throws Exception {
super.doStart();
- this.service = CamelReactiveStreams.get(getEndpoint().getCamelContext()).getService();
+ this.service = CamelReactiveStreams.get(getEndpoint().getCamelContext());
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
index 1bafc5c..01bf23d 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
@@ -16,74 +16,53 @@
*/
package org.apache.camel.component.reactive.streams.api;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.engine.CamelReactiveStreamsServiceImpl;
-import org.apache.camel.spi.FactoryFinder;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
+import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the main entry-point for getting Camel streams associate to reactive-streams endpoints.
*
- * It delegates main methods to an instance of {@link CamelReactiveStreamsService}. This component provides
- * a default implementation that can be overridden in a 'META-INF/services/reactive-streams/reactiveStreamsService' file.
+ * It allows to retrieve the {@link CamelReactiveStreamsService} to access Camel streams.
+ * This class returns the default implementation of the service unless the client requests a named service,
*/
public final class CamelReactiveStreams {
private static final Logger LOG = LoggerFactory.getLogger(CamelReactiveStreams.class);
- private static Map<CamelContext, CamelReactiveStreams> instances = new ConcurrentHashMap<>();
-
- private CamelReactiveStreamsService service;
+ private CamelReactiveStreams() {
+ }
- private CamelReactiveStreams(CamelReactiveStreamsService service) {
- this.service = service;
+ public static CamelReactiveStreamsService get(CamelContext context) {
+ return get(context, null);
}
- public static CamelReactiveStreams get(CamelContext context) {
- instances.computeIfAbsent(context, ctx -> {
- CamelReactiveStreamsService service = resolveReactiveStreamsService(context);
+ public static CamelReactiveStreamsService get(CamelContext context, String serviceName) {
+ CamelReactiveStreamsService service = context.hasService(CamelReactiveStreamsService.class);
+ if (service == null) {
+ service = resolveReactiveStreamsService(context, serviceName);
try {
- ctx.addService(service, true, true);
+ context.addService(service, true, true);
} catch (Exception ex) {
throw new IllegalStateException("Cannot add the CamelReactiveStreamsService to the Camel context", ex);
}
- return new CamelReactiveStreams(service);
- });
-
- return instances.get(context);
- }
+ }
- @SuppressWarnings("unchecked")
- private static CamelReactiveStreamsService resolveReactiveStreamsService(CamelContext context) {
- Class<? extends CamelReactiveStreamsService> serviceClass = null;
- try {
- FactoryFinder finder = context.getFactoryFinder("META-INF/services/reactive-streams/");
- LOG.trace("Using FactoryFinder: {}", finder);
- serviceClass = (Class<? extends CamelReactiveStreamsService>) finder.findClass("reactiveStreamsService");
- } catch (ClassNotFoundException e) {
- LOG.trace("'reactive.streams.service.class' not found", e);
- } catch (IOException e) {
- LOG.trace("No reactive stream service defined in 'META-INF/services/org/apache/camel/component/'", e);
+ if (!ObjectHelper.equal(service.getName(), serviceName)) {
+ // only a single implementation of the CamelReactiveStreamService can be present per Camel context
+ throw new IllegalArgumentException("Cannot use two different implementations of CamelReactiveStreamsService in the same CamelContext: "
+ + "existing service name [" + service.getName() + "] - requested [" + serviceName + "]");
}
+ return service;
+ }
+
+ private static CamelReactiveStreamsService resolveReactiveStreamsService(CamelContext context, String serviceName) {
CamelReactiveStreamsService service = null;
- if (serviceClass != null) {
- try {
- service = serviceClass.newInstance();
- LOG.info("Created reactive stream service from class: " + serviceClass.getName());
- } catch (Exception e) {
- LOG.debug("Unable to create a reactive stream service of class " + serviceClass.getName(), e);
- }
+ if (serviceName != null) {
+ service = context.getRegistry().lookupByNameAndType(serviceName, CamelReactiveStreamsService.class);
}
if (service == null) {
@@ -94,73 +73,4 @@ public final class CamelReactiveStreams {
return service;
}
- /**
- * Allows retrieving the service responsible for binding camel routes to streams.
- *
- * @return the stream service
- */
- public CamelReactiveStreamsService getService() {
- return service;
- }
-
- /**
- * Returns the publisher associated to the given stream name.
- * A publisher can be used to push Camel exchanges to reactive-streams subscribers.
- *
- * @param name the stream name
- * @return the stream publisher
- */
- public Publisher<Exchange> getPublisher(String name) {
- Objects.requireNonNull(name, "name cannot be null");
-
- return service.getPublisher(name);
- }
-
- /**
- * Returns the publisher associated to the given stream name.
- * A publisher can be used to push Camel exchange to external reactive-streams subscribers.
- *
- * The publisher converts automatically exchanges to the given type.
- *
- * @param name the stream name
- * @param type the type of the emitted items
- * @param <T> the type of items emitted by the publisher
- * @return the publisher associated to the stream
- */
- public <T> Publisher<T> getPublisher(String name, Class<T> type) {
- Objects.requireNonNull(name, "name cannot be null");
-
- return service.getPublisher(name, type);
- }
-
- /**
- * Returns the subscriber associated to the given stream name.
- * A subscriber can be used to push items coming from external reactive-streams publishers to Camel routes.
- *
- * @param name the stream name
- * @return the subscriber associated with the stream
- */
- public Subscriber<Exchange> getSubscriber(String name) {
- Objects.requireNonNull(name, "name cannot be null");
-
- return service.getSubscriber(name);
- }
-
- /**
- * Returns the subscriber associated to the given stream name.
- * A subscriber can be used to push items coming from external reactive-streams publishers to Camel routes.
- *
- * The subscriber converts automatically items of the given type to exchanges before pushing them.
- *
- * @param name the stream name
- * @param type the publisher converts automatically exchanges to the given type.
- * @param <T> the type of items accepted by the subscriber
- * @return the subscriber associated with the stream
- */
- public <T> Subscriber<T> getSubscriber(String name, Class<T> type) {
- Objects.requireNonNull(name, "name cannot be null");
-
- return service.getSubscriber(name, type);
- }
-
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index 147ece7..a397ce9 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -81,7 +81,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
*/
/**
- * Sends the exchange to all active subscriptions on the given stream.
+ * Used by Camel to send the exchange to all active subscriptions on the given stream.
* The callback is used to signal that the exchange has been delivered to the subscribers.
*
* @param name the stream name
@@ -95,7 +95,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
*/
/**
- * Associate the subscriber of the stream with the given name to a specific Camel consumer.
+ * Used by Camel to associate the subscriber of the stream with the given name to a specific Camel consumer.
* This method is used to bind a Camel route to a reactive stream.
*
* @param name the stream name
@@ -105,10 +105,17 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
void attachConsumer(String name, ReactiveStreamsConsumer consumer);
/**
- * Deassociate the existing consumer from the given stream.
+ * Used by Camel to detach the existing consumer from the given stream.
*
* @param name the stream name
*/
void detachConsumer(String name);
+ /**
+ * Returns the name of this service implementation if present.
+ * The name of any named implementation must match their lookup key in the registry.
+ * @return the name of the service implementation or null (for the default implementation)
+ */
+ String getName();
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/DispatchCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/DispatchCallback.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/DispatchCallback.java
index c07f3d3..01785d4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/DispatchCallback.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/DispatchCallback.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.reactive.streams.api;
/**
* A callback used to signal when a item coming from a Camel route has been delivered to the external stream processor.
*/
+@FunctionalInterface
public interface DispatchCallback<T> {
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index fb6b693..ac171de 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -130,4 +130,9 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
public CamelContext getCamelContext() {
return this.context;
}
+
+ @Override
+ public String getName() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/readme.adoc
----------------------------------------------------------------------
diff --git a/components/readme.adoc b/components/readme.adoc
index b8fb246..190f8ea 100644
--- a/components/readme.adoc
+++ b/components/readme.adoc
@@ -439,7 +439,7 @@ Components
`rabbitmq:hostname:portNumber/exchangeName` | The rabbitmq component allows you produce and consume messages from RabbitMQ instances.
| link:camel-reactive-streams/src/main/docs/reactive-streams-component.adoc[Reactive Streams] (camel-reactive-streams) +
-`reactive-streams:/stream` | The Camel reactive-streams endpoint.
+`reactive-streams:stream` | The Camel reactive-streams endpoint.
| link:camel-restlet/src/main/docs/restlet-component.adoc[Restlet] (camel-restlet) +
`restlet:protocol:host:port/uriPattern` | Component for consuming and producing Restful resources using Restlet.