You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/09 12:00:40 UTC

camel git commit: CAMEL-11124: camel-reactive-streams - Add exception for discarded streams.

Repository: camel
Updated Branches:
  refs/heads/master 96bbc91cf -> 872082312


CAMEL-11124: camel-reactive-streams - Add exception for discarded streams.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/87208231
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/87208231
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/87208231

Branch: refs/heads/master
Commit: 87208231252a330fc0f3c728a6f3358702c326db
Parents: 96bbc91
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 9 13:55:26 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 9 13:55:26 2017 +0200

----------------------------------------------------------------------
 .../main/docs/reactive-streams-component.adoc   |  4 +--
 .../streams/ReactiveStreamsComponent.java       |  3 ++
 .../ReactiveStreamsDiscardedException.java      | 35 ++++++++++++++++++++
 .../reactive/streams/engine/CamelPublisher.java |  2 +-
 .../streams/engine/CamelSubscription.java       | 12 +++++--
 .../ReactiveStreamsComponentConfiguration.java  |  3 +-
 6 files changed, 53 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
index 0c6fc8a..07535d6 100644
--- a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
+++ b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
@@ -51,8 +51,8 @@ The Reactive Streams component supports 3 options which are listed below.
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
 | Name | Description | Default | Type
-| **internalEngine Configuration** (common) | Configures the internal engine for Reactive Streams. |  | ReactiveStreamsEngine Configuration
-| **backpressureStrategy** (common) | The backpressure strategy to use when pushing events to a slow subscriber. |  | ReactiveStreams BackpressureStrategy
+| **internalEngine Configuration** (advanced) | Configures the internal engine for Reactive Streams. |  | ReactiveStreamsEngine Configuration
+| **backpressureStrategy** (producer) | The backpressure strategy to use when pushing events to a slow subscriber. | BUFFER | ReactiveStreams BackpressureStrategy
 | **resolveProperty Placeholders** (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
 |=======================================================================
 // component options: END

http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
index 5801877..a80f648 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
@@ -21,14 +21,17 @@ import java.util.Map;
 import org.apache.camel.Endpoint;
 import org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration;
 import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.spi.Metadata;
 
 /**
  * The Camel reactive-streams component.
  */
 public class ReactiveStreamsComponent extends UriEndpointComponent {
 
+    @Metadata(label = "advanced")
     private ReactiveStreamsEngineConfiguration internalEngineConfiguration = new ReactiveStreamsEngineConfiguration();
 
+    @Metadata(label = "producer", defaultValue = "BUFFER")
     private ReactiveStreamsBackpressureStrategy backpressureStrategy = ReactiveStreamsBackpressureStrategy.BUFFER;
 
     public ReactiveStreamsComponent() {

http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java
new file mode 100644
index 0000000..0ad811a
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+
+public class ReactiveStreamsDiscardedException extends CamelExchangeException {
+
+    private final String name;
+
+    public ReactiveStreamsDiscardedException(String message, Exchange exchange, String name) {
+        super(message, exchange);
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index f90f19c..4544b7d 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -62,7 +62,7 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
     @Override
     public void subscribe(Subscriber<? super StreamPayload<Exchange>> subscriber) {
         Objects.requireNonNull(subscriber, "subscriber must not be null");
-        CamelSubscription sub = new CamelSubscription(workerPool, this, this.backpressureStrategy, subscriber);
+        CamelSubscription sub = new CamelSubscription(workerPool, this, name, this.backpressureStrategy, subscriber);
         this.subscriptions.add(sub);
         subscriber.onSubscribe(sub);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index 431ca6d..60b42a3 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsDiscardedException;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 import org.slf4j.Logger;
@@ -43,6 +44,8 @@ public class CamelSubscription implements Subscription {
 
     private ExecutorService workerPool;
 
+    private String name;
+
     private CamelPublisher publisher;
 
     private ReactiveStreamsBackpressureStrategy backpressureStrategy;
@@ -78,9 +81,12 @@ public class CamelSubscription implements Subscription {
     private boolean sending;
 
 
-    public CamelSubscription(ExecutorService workerPool, CamelPublisher publisher, ReactiveStreamsBackpressureStrategy backpressureStrategy, Subscriber<? super StreamPayload<Exchange>> subscriber) {
+    public CamelSubscription(ExecutorService workerPool, CamelPublisher publisher, String name,
+                             ReactiveStreamsBackpressureStrategy backpressureStrategy,
+                             Subscriber<? super StreamPayload<Exchange>> subscriber) {
         this.workerPool = workerPool;
         this.publisher = publisher;
+        this.name = name;
         this.backpressureStrategy = backpressureStrategy;
         this.subscriber = subscriber;
     }
@@ -232,7 +238,9 @@ public class CamelSubscription implements Subscription {
         if (discardedMessages != null) {
             for (Map.Entry<StreamPayload<Exchange>, String> discarded : discardedMessages.entrySet()) {
                 StreamPayload<Exchange> m = discarded.getKey();
-                m.getCallback().processed(m.getItem(), new IllegalStateException(discarded.getValue()));
+                Exchange exchange = m.getItem();
+                ReactiveStreamsDiscardedException e = new ReactiveStreamsDiscardedException("Discarded by backpressure strategy", exchange, name);
+                m.getCallback().processed(exchange, e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java
index b33267f..5fb6dd4 100644
--- a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.reactive.streams.springboot;
 
 import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
 /**
@@ -35,7 +36,7 @@ public class ReactiveStreamsComponentConfiguration {
      * The backpressure strategy to use when pushing events to a slow
      * subscriber.
      */
-    private ReactiveStreamsBackpressureStrategy backpressureStrategy;
+    private ReactiveStreamsBackpressureStrategy backpressureStrategy = ReactiveStreamsBackpressureStrategy.BUFFER;
     /**
      * Whether the component should resolve property placeholders on itself when
      * starting. Only properties which are of String type can use property