You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/01/27 14:35:01 UTC

[2/6] camel git commit: CAMEL-10650: some fixes after review

CAMEL-10650: some fixes after review


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/40b42e65
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/40b42e65
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/40b42e65

Branch: refs/heads/master
Commit: 40b42e656479b568d401baebe30e1dbc71c4979b
Parents: e66207a
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Mon Jan 23 10:33:04 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Fri Jan 27 15:31:31 2017 +0100

----------------------------------------------------------------------
 .../reactive/streams/ReactiveStreamsConsumer.java   |  2 +-
 .../reactive/streams/ReactiveStreamsEndpoint.java   |  2 +-
 .../reactive/streams/ReactiveStreamsProducer.java   |  4 +---
 .../reactive/streams/api/CamelReactiveStreams.java  | 16 ++++++++--------
 .../reactive/streams/engine/CamelPublisher.java     | 11 +++++++----
 .../reactive/streams/engine/StreamPayload.java      |  9 ---------
 .../streams/engine/UnwrappingPublisher.java         | 12 +++++++-----
 .../reactive/streams/util/ConvertingPublisher.java  | 13 ++++++++-----
 8 files changed, 33 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index c345a2b..ca19e0a 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -71,7 +71,7 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
 
             executorService.execute(() -> this.getAsyncProcessor().process(exchange, doneSync -> {
                 if (exchange.getException() != null) {
-                    LOG.warn("Error processing the exchange " + exchange + " from reactive-streams", exchange.getException());
+                    getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
                 }
 
                 callback.done(doneSync);

http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
index 7d294f1..0e72c52 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
@@ -27,7 +27,7 @@ import org.apache.camel.spi.UriPath;
 /**
  * The Camel reactive-streams endpoint.
  */
-@UriEndpoint(scheme = "reactive-streams", title = "Reactive Streams", syntax = "reactive-streams:/stream",
+@UriEndpoint(scheme = "reactive-streams", title = "Reactive Streams", syntax = "reactive-streams:stream",
         consumerClass = ReactiveStreamsConsumer.class, label = "streams")
 public class ReactiveStreamsEndpoint extends DefaultEndpoint {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
index a8ec559..d74cdb1 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
@@ -43,9 +43,7 @@ public class ReactiveStreamsProducer<T> extends DefaultAsyncProducer {
                 data.setException(error);
             }
 
-            if (callback != null) {
-                callback.done(false);
-            }
+            callback.done(false);
         });
         return false;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
index dfa8185..1bafc5c 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -40,7 +41,7 @@ public final class CamelReactiveStreams {
 
     private static final Logger LOG = LoggerFactory.getLogger(CamelReactiveStreams.class);
 
-    private static Map<CamelContext, CamelReactiveStreams> instances = new HashMap<>();
+    private static Map<CamelContext, CamelReactiveStreams> instances = new ConcurrentHashMap<>();
 
     private CamelReactiveStreamsService service;
 
@@ -48,17 +49,16 @@ public final class CamelReactiveStreams {
         this.service = service;
     }
 
-    public static synchronized CamelReactiveStreams get(CamelContext context) {
-        if (!instances.containsKey(context)) {
+    public static CamelReactiveStreams get(CamelContext context) {
+        instances.computeIfAbsent(context, ctx -> {
             CamelReactiveStreamsService service = resolveReactiveStreamsService(context);
             try {
-                context.addService(service, true, true);
+                ctx.addService(service, true, true);
             } catch (Exception ex) {
                 throw new IllegalStateException("Cannot add the CamelReactiveStreamsService to the Camel context", ex);
             }
-
-            instances.put(context, new CamelReactiveStreams(service));
-        }
+            return new CamelReactiveStreams(service);
+        });
 
         return instances.get(context);
     }
@@ -81,7 +81,7 @@ public final class CamelReactiveStreams {
             try {
                 service = serviceClass.newInstance();
                 LOG.info("Created reactive stream service from class: " + serviceClass.getName());
-            } catch (InstantiationException | IllegalAccessException e) {
+            } catch (Exception e) {
                 LOG.debug("Unable to create a reactive stream service of class " + serviceClass.getName(), e);
             }
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index bf675fb..6a30625 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -70,15 +70,18 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
     public void publish(StreamPayload<Exchange> data) {
         // freeze the subscriptions
         List<CamelSubscription> subs = new LinkedList<>(subscriptions);
+
         DispatchCallback<Exchange> originalCallback = data.getCallback();
         if (originalCallback != null && subs.size() > 0) {
-            // Notify processing once if multiple subscribers are present
-            AtomicInteger counter = new AtomicInteger(0);
+            // When multiple subscribers have an active subscription,
+            // we aknowledge the exchange once it has been delivered to every
+            // subscriber (or their subscription is cancelled)
+            AtomicInteger counter = new AtomicInteger(subs.size());
+            // Use just the first exception in the callback when multiple exceptions are thrown
             AtomicReference<Throwable> thrown = new AtomicReference<>(null);
             data = new StreamPayload<>(data.getItem(), (ex, error) -> {
-                int status = counter.incrementAndGet();
                 thrown.compareAndSet(null, error);
-                if (status == subs.size()) {
+                if (counter.decrementAndGet() == 0) {
                     originalCallback.processed(ex, thrown.get());
                 }
             });

http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/StreamPayload.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/StreamPayload.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/StreamPayload.java
index b592c40..5a1fcbe 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/StreamPayload.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/StreamPayload.java
@@ -40,13 +40,4 @@ public class StreamPayload<D> {
         return callback;
     }
 
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder("StreamPayload{");
-        sb.append("item=").append(item);
-        sb.append(", callback=").append(callback);
-        sb.append('}');
-        return sb.toString();
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
index 1cf73cc..85c87a2 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
@@ -46,16 +46,18 @@ public class UnwrappingPublisher<R> implements Publisher<R> {
             private Subscription subscription;
 
             @Override
-            public void onSubscribe(Subscription subscription) {
-                if (subscription == null) {
+            public void onSubscribe(Subscription newSubscription) {
+                if (newSubscription == null) {
                     throw new NullPointerException("subscription is null");
+                } else if (newSubscription == this.subscription) {
+                    throw new IllegalArgumentException("already subscribed to the subscription: " + newSubscription);
                 }
 
                 if (this.subscription != null) {
-                    subscription.cancel();
+                    newSubscription.cancel();
                 } else {
-                    this.subscription = subscription;
-                    subscriber.onSubscribe(subscription);
+                    this.subscription = newSubscription;
+                    subscriber.onSubscribe(newSubscription);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
index e8c10dd..12ed7df 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
@@ -53,15 +53,18 @@ public class ConvertingPublisher<R> implements Publisher<R> {
             private Subscription subscription;
 
             @Override
-            public void onSubscribe(Subscription subscription) {
-                if (subscription == null) {
+            public void onSubscribe(Subscription newSubscription) {
+                if (newSubscription == null) {
                     throw new NullPointerException("subscription is null");
+                } else if (newSubscription == this.subscription) {
+                    throw new IllegalArgumentException("already subscribed to the subscription: " + newSubscription);
                 }
+
                 if (this.subscription != null) {
-                    subscription.cancel();
+                    newSubscription.cancel();
                 } else {
-                    this.subscription = subscription;
-                    subscriber.onSubscribe(subscription);
+                    this.subscription = newSubscription;
+                    subscriber.onSubscribe(newSubscription);
                 }
             }