You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/01/27 14:35:01 UTC
[2/6] camel git commit: CAMEL-10650: some fixes after review
CAMEL-10650: some fixes after review
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/40b42e65
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/40b42e65
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/40b42e65
Branch: refs/heads/master
Commit: 40b42e656479b568d401baebe30e1dbc71c4979b
Parents: e66207a
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Mon Jan 23 10:33:04 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Fri Jan 27 15:31:31 2017 +0100
----------------------------------------------------------------------
.../reactive/streams/ReactiveStreamsConsumer.java | 2 +-
.../reactive/streams/ReactiveStreamsEndpoint.java | 2 +-
.../reactive/streams/ReactiveStreamsProducer.java | 4 +---
.../reactive/streams/api/CamelReactiveStreams.java | 16 ++++++++--------
.../reactive/streams/engine/CamelPublisher.java | 11 +++++++----
.../reactive/streams/engine/StreamPayload.java | 9 ---------
.../streams/engine/UnwrappingPublisher.java | 12 +++++++-----
.../reactive/streams/util/ConvertingPublisher.java | 13 ++++++++-----
8 files changed, 33 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index c345a2b..ca19e0a 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -71,7 +71,7 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
executorService.execute(() -> this.getAsyncProcessor().process(exchange, doneSync -> {
if (exchange.getException() != null) {
- LOG.warn("Error processing the exchange " + exchange + " from reactive-streams", exchange.getException());
+ getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
}
callback.done(doneSync);
http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
index 7d294f1..0e72c52 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsEndpoint.java
@@ -27,7 +27,7 @@ import org.apache.camel.spi.UriPath;
/**
* The Camel reactive-streams endpoint.
*/
-@UriEndpoint(scheme = "reactive-streams", title = "Reactive Streams", syntax = "reactive-streams:/stream",
+@UriEndpoint(scheme = "reactive-streams", title = "Reactive Streams", syntax = "reactive-streams:stream",
consumerClass = ReactiveStreamsConsumer.class, label = "streams")
public class ReactiveStreamsEndpoint extends DefaultEndpoint {
http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
index a8ec559..d74cdb1 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsProducer.java
@@ -43,9 +43,7 @@ public class ReactiveStreamsProducer<T> extends DefaultAsyncProducer {
data.setException(error);
}
- if (callback != null) {
- callback.done(false);
- }
+ callback.done(false);
});
return false;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
index dfa8185..1bafc5c 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreams.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -40,7 +41,7 @@ public final class CamelReactiveStreams {
private static final Logger LOG = LoggerFactory.getLogger(CamelReactiveStreams.class);
- private static Map<CamelContext, CamelReactiveStreams> instances = new HashMap<>();
+ private static Map<CamelContext, CamelReactiveStreams> instances = new ConcurrentHashMap<>();
private CamelReactiveStreamsService service;
@@ -48,17 +49,16 @@ public final class CamelReactiveStreams {
this.service = service;
}
- public static synchronized CamelReactiveStreams get(CamelContext context) {
- if (!instances.containsKey(context)) {
+ public static CamelReactiveStreams get(CamelContext context) {
+ instances.computeIfAbsent(context, ctx -> {
CamelReactiveStreamsService service = resolveReactiveStreamsService(context);
try {
- context.addService(service, true, true);
+ ctx.addService(service, true, true);
} catch (Exception ex) {
throw new IllegalStateException("Cannot add the CamelReactiveStreamsService to the Camel context", ex);
}
-
- instances.put(context, new CamelReactiveStreams(service));
- }
+ return new CamelReactiveStreams(service);
+ });
return instances.get(context);
}
@@ -81,7 +81,7 @@ public final class CamelReactiveStreams {
try {
service = serviceClass.newInstance();
LOG.info("Created reactive stream service from class: " + serviceClass.getName());
- } catch (InstantiationException | IllegalAccessException e) {
+ } catch (Exception e) {
LOG.debug("Unable to create a reactive stream service of class " + serviceClass.getName(), e);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index bf675fb..6a30625 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -70,15 +70,18 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
public void publish(StreamPayload<Exchange> data) {
// freeze the subscriptions
List<CamelSubscription> subs = new LinkedList<>(subscriptions);
+
DispatchCallback<Exchange> originalCallback = data.getCallback();
if (originalCallback != null && subs.size() > 0) {
- // Notify processing once if multiple subscribers are present
- AtomicInteger counter = new AtomicInteger(0);
+ // When multiple subscribers have an active subscription,
+ // we aknowledge the exchange once it has been delivered to every
+ // subscriber (or their subscription is cancelled)
+ AtomicInteger counter = new AtomicInteger(subs.size());
+ // Use just the first exception in the callback when multiple exceptions are thrown
AtomicReference<Throwable> thrown = new AtomicReference<>(null);
data = new StreamPayload<>(data.getItem(), (ex, error) -> {
- int status = counter.incrementAndGet();
thrown.compareAndSet(null, error);
- if (status == subs.size()) {
+ if (counter.decrementAndGet() == 0) {
originalCallback.processed(ex, thrown.get());
}
});
http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/StreamPayload.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/StreamPayload.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/StreamPayload.java
index b592c40..5a1fcbe 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/StreamPayload.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/StreamPayload.java
@@ -40,13 +40,4 @@ public class StreamPayload<D> {
return callback;
}
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder("StreamPayload{");
- sb.append("item=").append(item);
- sb.append(", callback=").append(callback);
- sb.append('}');
- return sb.toString();
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
index 1cf73cc..85c87a2 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java
@@ -46,16 +46,18 @@ public class UnwrappingPublisher<R> implements Publisher<R> {
private Subscription subscription;
@Override
- public void onSubscribe(Subscription subscription) {
- if (subscription == null) {
+ public void onSubscribe(Subscription newSubscription) {
+ if (newSubscription == null) {
throw new NullPointerException("subscription is null");
+ } else if (newSubscription == this.subscription) {
+ throw new IllegalArgumentException("already subscribed to the subscription: " + newSubscription);
}
if (this.subscription != null) {
- subscription.cancel();
+ newSubscription.cancel();
} else {
- this.subscription = subscription;
- subscriber.onSubscribe(subscription);
+ this.subscription = newSubscription;
+ subscriber.onSubscribe(newSubscription);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/40b42e65/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
index e8c10dd..12ed7df 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java
@@ -53,15 +53,18 @@ public class ConvertingPublisher<R> implements Publisher<R> {
private Subscription subscription;
@Override
- public void onSubscribe(Subscription subscription) {
- if (subscription == null) {
+ public void onSubscribe(Subscription newSubscription) {
+ if (newSubscription == null) {
throw new NullPointerException("subscription is null");
+ } else if (newSubscription == this.subscription) {
+ throw new IllegalArgumentException("already subscribed to the subscription: " + newSubscription);
}
+
if (this.subscription != null) {
- subscription.cancel();
+ newSubscription.cancel();
} else {
- this.subscription = subscription;
- subscriber.onSubscribe(subscription);
+ this.subscription = newSubscription;
+ subscriber.onSubscribe(newSubscription);
}
}