You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/01/27 14:35:02 UTC

[3/6] camel git commit: CAMEL-10650: adding lookup into registry for named services

CAMEL-10650: adding lookup into registry for named services

CAMEL-10650: adding lookup into registry for named services


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9cbda1a1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9cbda1a1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9cbda1a1

Branch: refs/heads/master
Commit: 9cbda1a1042e0ce463fa4a2b57ca56369137b934
Parents: 40b42e6
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Jan 27 10:48:59 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Fri Jan 27 15:31:31 2017 +0100

----------------------------------------------------------------------
 .../streams/ReactiveStreamsConsumer.java        |   4 +-
 .../streams/ReactiveStreamsProducer.java        |   2 +-
 .../streams/api/CamelReactiveStreams.java       | 136 ++++---------------
 .../api/CamelReactiveStreamsService.java        |  13 +-
 .../reactive/streams/api/DispatchCallback.java  |   1 +
 .../engine/CamelReactiveStreamsServiceImpl.java |   5 +
 components/readme.adoc                          |   2 +-
 7 files changed, 43 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index ca19e0a..73d807f 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -51,13 +51,13 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
             executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
         }
 
-        CamelReactiveStreams.get(endpoint.getCamelContext()).getService().attachConsumer(endpoint.getStream(), this);
+        CamelReactiveStreams.get(endpoint.getCamelContext()).attachConsumer(endpoint.getStream(), this);
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        CamelReactiveStreams.get(endpoint.getCamelContext()).getService().detachConsumer(endpoint.getStream());
+        CamelReactiveStreams.get(endpoint.getCamelContext()).detachConsumer(endpoint.getStream());
 
         if (executor != null) {
             endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);

http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
index d74cdb1..3d90179 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
@@ -51,7 +51,7 @@ public class ReactiveStreamsProducer<T> extends DefaultAsyncProducer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        this.service = CamelReactiveStreams.get(getEndpoint().getCamelContext()).getService();
+        this.service = CamelReactiveStreams.get(getEndpoint().getCamelContext());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
index 1bafc5c..01bf23d 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
@@ -16,74 +16,53 @@
  */
 package org.apache.camel.component.reactive.streams.api;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.engine.CamelReactiveStreamsServiceImpl;
-import org.apache.camel.spi.FactoryFinder;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * This is the main entry-point for getting Camel streams associate to reactive-streams endpoints.
  *
- * It delegates main methods to an instance of {@link CamelReactiveStreamsService}. This component provides
- * a default implementation that can be overridden in a 'META-INF/services/reactive-streams/reactiveStreamsService' file.
+ * It allows to retrieve the {@link CamelReactiveStreamsService} to access Camel streams.
+ * This class returns the default implementation of the service unless the client requests a named service,
  */
 public final class CamelReactiveStreams {
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelReactiveStreams.class);
 
-    private static Map<CamelContext, CamelReactiveStreams> instances = new ConcurrentHashMap<>();
-
-    private CamelReactiveStreamsService service;
+    private CamelReactiveStreams() {
+    }
 
-    private CamelReactiveStreams(CamelReactiveStreamsService service) {
-        this.service = service;
+    public static CamelReactiveStreamsService get(CamelContext context) {
+        return get(context, null);
     }
 
-    public static CamelReactiveStreams get(CamelContext context) {
-        instances.computeIfAbsent(context, ctx -> {
-            CamelReactiveStreamsService service = resolveReactiveStreamsService(context);
+    public static CamelReactiveStreamsService get(CamelContext context, String serviceName) {
+        CamelReactiveStreamsService service = context.hasService(CamelReactiveStreamsService.class);
+        if (service == null) {
+            service = resolveReactiveStreamsService(context, serviceName);
             try {
-                ctx.addService(service, true, true);
+                context.addService(service, true, true);
             } catch (Exception ex) {
                 throw new IllegalStateException("Cannot add the CamelReactiveStreamsService to the Camel context", ex);
             }
-            return new CamelReactiveStreams(service);
-        });
-
-        return instances.get(context);
-    }
+        }
 
-    @SuppressWarnings("unchecked")
-    private static CamelReactiveStreamsService resolveReactiveStreamsService(CamelContext context) {
-        Class<? extends CamelReactiveStreamsService> serviceClass = null;
-        try {
-            FactoryFinder finder = context.getFactoryFinder("META-INF/services/reactive-streams/");
-            LOG.trace("Using FactoryFinder: {}", finder);
-            serviceClass = (Class<? extends CamelReactiveStreamsService>) finder.findClass("reactiveStreamsService");
-        } catch (ClassNotFoundException e) {
-            LOG.trace("'reactive.streams.service.class' not found", e);
-        } catch (IOException e) {
-            LOG.trace("No reactive stream service defined in 'META-INF/services/org/apache/camel/component/'", e);
+        if (!ObjectHelper.equal(service.getName(), serviceName)) {
+            // only a single implementation of the CamelReactiveStreamService can be present per Camel context
+            throw new IllegalArgumentException("Cannot use two different implementations of CamelReactiveStreamsService in the same CamelContext: "
+                    + "existing service name [" + service.getName() + "] - requested [" + serviceName + "]");
         }
 
+        return service;
+    }
+
+    private static CamelReactiveStreamsService resolveReactiveStreamsService(CamelContext context, String serviceName) {
         CamelReactiveStreamsService service = null;
-        if (serviceClass != null) {
-            try {
-                service = serviceClass.newInstance();
-                LOG.info("Created reactive stream service from class: " + serviceClass.getName());
-            } catch (Exception e) {
-                LOG.debug("Unable to create a reactive stream service of class " + serviceClass.getName(), e);
-            }
+        if (serviceName != null) {
+            service = context.getRegistry().lookupByNameAndType(serviceName, CamelReactiveStreamsService.class);
         }
 
         if (service == null) {
@@ -94,73 +73,4 @@ public final class CamelReactiveStreams {
         return service;
     }
 
-    /**
-     * Allows retrieving the service responsible for binding camel routes to streams.
-     *
-     * @return the stream service
-     */
-    public CamelReactiveStreamsService getService() {
-        return service;
-    }
-
-    /**
-     * Returns the publisher associated to the given stream name.
-     * A publisher can be used to push Camel exchanges to reactive-streams subscribers.
-     *
-     * @param name the stream name
-     * @return the stream publisher
-     */
-    public Publisher<Exchange> getPublisher(String name) {
-        Objects.requireNonNull(name, "name cannot be null");
-
-        return service.getPublisher(name);
-    }
-
-    /**
-     * Returns the publisher associated to the given stream name.
-     * A publisher can be used to push Camel exchange to external reactive-streams subscribers.
-     *
-     * The publisher converts automatically exchanges to the given type.
-     *
-     * @param name the stream name
-     * @param type the type of the emitted items
-     * @param <T> the type of items emitted by the publisher
-     * @return the publisher associated to the stream
-     */
-    public <T> Publisher<T> getPublisher(String name, Class<T> type) {
-        Objects.requireNonNull(name, "name cannot be null");
-
-        return service.getPublisher(name, type);
-    }
-
-    /**
-     * Returns the subscriber associated to the given stream name.
-     * A subscriber can be used to push items coming from external reactive-streams publishers to Camel routes.
-     *
-     * @param name the stream name
-     * @return the subscriber associated with the stream
-     */
-    public Subscriber<Exchange> getSubscriber(String name) {
-        Objects.requireNonNull(name, "name cannot be null");
-
-        return service.getSubscriber(name);
-    }
-
-    /**
-     * Returns the subscriber associated to the given stream name.
-     * A subscriber can be used to push items coming from external reactive-streams publishers to Camel routes.
-     *
-     * The subscriber converts automatically items of the given type to exchanges before pushing them.
-     *
-     * @param name the stream name
-     * @param type the publisher converts automatically exchanges to the given type.
-     * @param <T> the type of items accepted by the subscriber
-     * @return the subscriber associated with the stream
-     */
-    public <T> Subscriber<T> getSubscriber(String name, Class<T> type) {
-        Objects.requireNonNull(name, "name cannot be null");
-
-        return service.getSubscriber(name, type);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index 147ece7..a397ce9 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -81,7 +81,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      */
 
     /**
-     * Sends the exchange to all active subscriptions on the given stream.
+     * Used by Camel to send the exchange to all active subscriptions on the given stream.
      * The callback is used to signal that the exchange has been delivered to the subscribers.
      *
      * @param name the stream name
@@ -95,7 +95,7 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
      */
 
     /**
-     * Associate the subscriber of the stream with the given name to a specific Camel consumer.
+     * Used by Camel to associate the subscriber of the stream with the given name to a specific Camel consumer.
      * This method is used to bind a Camel route to a reactive stream.
      *
      * @param name the stream name
@@ -105,10 +105,17 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service
     void attachConsumer(String name, ReactiveStreamsConsumer consumer);
 
     /**
-     * Deassociate the existing consumer from the given stream.
+     * Used by Camel to detach the existing consumer from the given stream.
      *
      * @param name the stream name
      */
     void detachConsumer(String name);
 
+    /**
+     * Returns the name of this service implementation if present.
+     * The name of any named implementation must match their lookup key in the registry.
+     * @return the name of the service implementation or null (for the default implementation)
+     */
+    String getName();
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/DispatchCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/DispatchCallback.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/DispatchCallback.java
index c07f3d3..01785d4 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/DispatchCallback.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/DispatchCallback.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.reactive.streams.api;
 /**
  * A callback used to signal when a item coming from a Camel route has been delivered to the external stream processor.
  */
+@FunctionalInterface
 public interface DispatchCallback<T> {
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index fb6b693..ac171de 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -130,4 +130,9 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ
     public CamelContext getCamelContext() {
         return this.context;
     }
+
+    @Override
+    public String getName() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9cbda1a1/components/readme.adoc
----------------------------------------------------------------------
diff --git a/components/readme.adoc b/components/readme.adoc
index b8fb246..190f8ea 100644
--- a/components/readme.adoc
+++ b/components/readme.adoc
@@ -439,7 +439,7 @@ Components
 `rabbitmq:hostname:portNumber/exchangeName` | The rabbitmq component allows you produce and consume messages from RabbitMQ instances.
 
 | link:camel-reactive-streams/src/main/docs/reactive-streams-component.adoc[Reactive Streams] (camel-reactive-streams) +
-`reactive-streams:/stream` | The Camel reactive-streams endpoint.
+`reactive-streams:stream` | The Camel reactive-streams endpoint.
 
 | link:camel-restlet/src/main/docs/restlet-component.adoc[Restlet] (camel-restlet) +
 `restlet:protocol:host:port/uriPattern` | Component for consuming and producing Restful resources using Restlet.