You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/09 12:00:40 UTC
camel git commit: CAMEL-11124: camel-reactive-streams - Add exception
for discarded streams.
Repository: camel
Updated Branches:
refs/heads/master 96bbc91cf -> 872082312
CAMEL-11124: camel-reactive-streams - Add exception for discarded streams.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/87208231
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/87208231
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/87208231
Branch: refs/heads/master
Commit: 87208231252a330fc0f3c728a6f3358702c326db
Parents: 96bbc91
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 9 13:55:26 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 9 13:55:26 2017 +0200
----------------------------------------------------------------------
.../main/docs/reactive-streams-component.adoc | 4 +--
.../streams/ReactiveStreamsComponent.java | 3 ++
.../ReactiveStreamsDiscardedException.java | 35 ++++++++++++++++++++
.../reactive/streams/engine/CamelPublisher.java | 2 +-
.../streams/engine/CamelSubscription.java | 12 +++++--
.../ReactiveStreamsComponentConfiguration.java | 3 +-
6 files changed, 53 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
index 0c6fc8a..07535d6 100644
--- a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
+++ b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
@@ -51,8 +51,8 @@ The Reactive Streams component supports 3 options which are listed below.
[width="100%",cols="2,5,^1,2",options="header"]
|=======================================================================
| Name | Description | Default | Type
-| **internalEngine Configuration** (common) | Configures the internal engine for Reactive Streams. | | ReactiveStreamsEngine Configuration
-| **backpressureStrategy** (common) | The backpressure strategy to use when pushing events to a slow subscriber. | | ReactiveStreams BackpressureStrategy
+| **internalEngine Configuration** (advanced) | Configures the internal engine for Reactive Streams. | | ReactiveStreamsEngine Configuration
+| **backpressureStrategy** (producer) | The backpressure strategy to use when pushing events to a slow subscriber. | BUFFER | ReactiveStreams BackpressureStrategy
| **resolveProperty Placeholders** (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
|=======================================================================
// component options: END
http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
index 5801877..a80f648 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsComponent.java
@@ -21,14 +21,17 @@ import java.util.Map;
import org.apache.camel.Endpoint;
import org.apache.camel.component.reactive.streams.engine.ReactiveStreamsEngineConfiguration;
import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.spi.Metadata;
/**
* The Camel reactive-streams component.
*/
public class ReactiveStreamsComponent extends UriEndpointComponent {
+ @Metadata(label = "advanced")
private ReactiveStreamsEngineConfiguration internalEngineConfiguration = new ReactiveStreamsEngineConfiguration();
+ @Metadata(label = "producer", defaultValue = "BUFFER")
private ReactiveStreamsBackpressureStrategy backpressureStrategy = ReactiveStreamsBackpressureStrategy.BUFFER;
public ReactiveStreamsComponent() {
http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java
new file mode 100644
index 0000000..0ad811a
--- /dev/null
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsDiscardedException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+
+public class ReactiveStreamsDiscardedException extends CamelExchangeException {
+
+ private final String name;
+
+ public ReactiveStreamsDiscardedException(String message, Exchange exchange, String name) {
+ super(message, exchange);
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index f90f19c..4544b7d 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -62,7 +62,7 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC
@Override
public void subscribe(Subscriber<? super StreamPayload<Exchange>> subscriber) {
Objects.requireNonNull(subscriber, "subscriber must not be null");
- CamelSubscription sub = new CamelSubscription(workerPool, this, this.backpressureStrategy, subscriber);
+ CamelSubscription sub = new CamelSubscription(workerPool, this, name, this.backpressureStrategy, subscriber);
this.subscriptions.add(sub);
subscriber.onSubscribe(sub);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index 431ca6d..60b42a3 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsDiscardedException;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
@@ -43,6 +44,8 @@ public class CamelSubscription implements Subscription {
private ExecutorService workerPool;
+ private String name;
+
private CamelPublisher publisher;
private ReactiveStreamsBackpressureStrategy backpressureStrategy;
@@ -78,9 +81,12 @@ public class CamelSubscription implements Subscription {
private boolean sending;
- public CamelSubscription(ExecutorService workerPool, CamelPublisher publisher, ReactiveStreamsBackpressureStrategy backpressureStrategy, Subscriber<? super StreamPayload<Exchange>> subscriber) {
+ public CamelSubscription(ExecutorService workerPool, CamelPublisher publisher, String name,
+ ReactiveStreamsBackpressureStrategy backpressureStrategy,
+ Subscriber<? super StreamPayload<Exchange>> subscriber) {
this.workerPool = workerPool;
this.publisher = publisher;
+ this.name = name;
this.backpressureStrategy = backpressureStrategy;
this.subscriber = subscriber;
}
@@ -232,7 +238,9 @@ public class CamelSubscription implements Subscription {
if (discardedMessages != null) {
for (Map.Entry<StreamPayload<Exchange>, String> discarded : discardedMessages.entrySet()) {
StreamPayload<Exchange> m = discarded.getKey();
- m.getCallback().processed(m.getItem(), new IllegalStateException(discarded.getValue()));
+ Exchange exchange = m.getItem();
+ ReactiveStreamsDiscardedException e = new ReactiveStreamsDiscardedException("Discarded by backpressure strategy", exchange, name);
+ m.getCallback().processed(exchange, e);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/87208231/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java
index b33267f..5fb6dd4 100644
--- a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/main/java/org/apache/camel/component/reactive/streams/springboot/ReactiveStreamsComponentConfiguration.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.reactive.streams.springboot;
import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
@@ -35,7 +36,7 @@ public class ReactiveStreamsComponentConfiguration {
* The backpressure strategy to use when pushing events to a slow
* subscriber.
*/
- private ReactiveStreamsBackpressureStrategy backpressureStrategy;
+ private ReactiveStreamsBackpressureStrategy backpressureStrategy = ReactiveStreamsBackpressureStrategy.BUFFER;
/**
* Whether the component should resolve property placeholders on itself when
* starting. Only properties which are of String type can use property