You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/01/31 10:34:38 UTC

[2/3] camel git commit: CAMEL-10612: refactoring backpressure strategy

CAMEL-10612: refactoring backpressure strategy


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f360260
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f360260
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f360260

Branch: refs/heads/master
Commit: 8f36026009c6f101e81b82010779591b2e73b5fe
Parents: b64250b
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Mon Jan 30 17:40:31 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Tue Jan 31 11:33:57 2017 +0100

----------------------------------------------------------------------
 .../main/docs/reactive-streams-component.adoc   | 11 +++++
 .../ReactiveStreamsBackpressureStrategy.java    | 49 ++++++++++++++++++--
 .../streams/engine/CamelSubscription.java       | 45 +++++++-----------
 3 files changed, 75 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8f360260/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
index 6d04b2c..715e2e6 100644
--- a/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
+++ b/components/camel-reactive-streams/src/main/docs/reactive-streams-component.adoc
@@ -193,6 +193,17 @@ In other circumstances, eg. when using a `http` consumer, the route suspension m
 using the default configuration (no policy, unbounded buffer) should be preferable. Users should try to avoid memory issues
 by limiting the number of requests to the http service (eg. scaling out).
 
+In contexts where a certain amount of data loss is acceptable, setting a backpressure strategy other than `BUFFER` can
+ be a solution for dealing with fast sources.
+
+[source,java]
+---------------------------------------------------------
+from("direct:thermostat")
+.to("reactive-streams:flow?backpressureStrategy=LATEST");
+---------------------------------------------------------
+
+When the `LATEST` backpressure strategy is used, only the last exchange received from the route is kept by the publisher, while older data is discarded (other options are available).
+
 ### Controlling Backpressure (consumer side)
 
 When Camel consumes items from a reactive-streams publisher, the maximum number of inflight exchanges can be set as endpoint option.

http://git-wip-us.apache.org/repos/asf/camel/blob/8f360260/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
index 823a7b8..db46915 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
@@ -16,6 +16,11 @@
  */
 package org.apache.camel.component.reactive.streams;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+
 /**
  * A list of possible backpressure strategy to use when the emission of upstream items cannot respect backpressure.
  */
@@ -24,17 +29,55 @@ public enum ReactiveStreamsBackpressureStrategy {
     /**
      * Buffers <em>all</em> onNext values until the downstream consumes it.
      */
-    BUFFER,
+    BUFFER {
+        @Override
+        public <T> Collection<T> update(Deque<T> buffer, T element) {
+            buffer.addLast(element);
+            return Collections.emptySet();
+        }
+    },
 
     /**
      * Drops the most recent onNext value if the downstream can't keep up.
      */
-    DROP,
+    DROP {
+        @Override
+        public <T> Collection<T> update(Deque<T> buffer, T element) {
+            if (buffer.size() > 0) {
+                return Arrays.asList(element);
+            } else {
+                buffer.addLast(element);
+                return Collections.emptySet();
+            }
+        }
+    },
 
     /**
      * Keeps only the latest onNext value, overwriting any previous value if the
      * downstream can't keep up.
      */
-    LATEST
+    LATEST {
+        @Override
+        public <T> Collection<T> update(Deque<T> buffer, T element) {
+            Collection<T> discarded = Collections.emptySet();
+            if (buffer.size() > 0) {
+                discarded = Arrays.asList(buffer.removeFirst());
+            }
+
+            buffer.addLast(element);
+            return discarded;
+        }
+    };
+
+
+    /**
+     * Updates the buffer and returns a list of discarded elements (if any).
+     *
+     * @param buffer the buffer to update
+     * @param element the elment that should possibly be inserted
+     * @param <T> the generic type of the element
+     * @return the list of discarded elements
+     */
+    public abstract <T> Collection<T> update(Deque<T> buffer, T element);
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8f360260/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index d5f4337..16c4eea 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -16,8 +16,12 @@
  */
 package org.apache.camel.component.reactive.streams.engine;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -205,44 +209,31 @@ public class CamelSubscription implements Subscription {
     }
 
     public void publish(StreamPayload<Exchange> message) {
-        StreamPayload<Exchange> discardedMessage = null;
-        String discardReason = null;
+        Map<StreamPayload<Exchange>, String> discardedMessages = null;
         try {
             mutex.lock();
             if (!this.terminating && !this.terminated) {
-                if (this.backpressureStrategy == ReactiveStreamsBackpressureStrategy.BUFFER) {
-                    buffer.addLast(message);
-                } else if (this.backpressureStrategy == ReactiveStreamsBackpressureStrategy.DROP) {
-                    if (buffer.size() > 0) {
-                        LOG.warn("Exchange " + message.getItem() + " dropped according to the backpressure strategy " + ReactiveStreamsBackpressureStrategy.DROP);
-                        discardedMessage = message;
-                        discardReason = "the backpressure strategy (DROP) does not allow buffering";
-                    } else {
-                        buffer.addLast(message);
+                Collection<StreamPayload<Exchange>> discarded = this.backpressureStrategy.update(buffer, message);
+                if (discarded.iterator().hasNext()) {
+                    discardedMessages = new HashMap<>();
+                    for (StreamPayload<Exchange> ex : discarded) {
+                        discardedMessages.put(ex, "Exchange " + ex.getItem() + " discarded by backpressure strategy " + this.backpressureStrategy);
                     }
-                } else if (this.backpressureStrategy == ReactiveStreamsBackpressureStrategy.LATEST) {
-                    if (buffer.size() > 0) {
-                        StreamPayload<Exchange> older = buffer.removeFirst();
-                        LOG.warn("Exchange " + message.getItem() + " dropped according to the backpressure strategy " + ReactiveStreamsBackpressureStrategy.LATEST);
-                        discardedMessage = older;
-                        discardReason = "the backpressure strategy (LATEST) does not allow buffering";
-                    }
-                    buffer.addLast(message);
-                } else {
-                    throw new IllegalStateException("Unsupported backpressure strategy: " + this.backpressureStrategy);
                 }
-
             } else {
-                discardedMessage = message;
-                discardReason = "subscription closed";
+                // acknowledge
+                discardedMessages = Collections.singletonMap(message, "Exchange " + message.getItem() + " discarded: subscription closed");
             }
         } finally {
             mutex.unlock();
         }
 
-        if (discardedMessage != null) {
-            // acknowledge
-            discardedMessage.getCallback().processed(message.getItem(), new IllegalStateException("Exchange discarded: " + discardReason));
+        // discarding outside of mutex scope
+        if (discardedMessages != null) {
+            for (Map.Entry<StreamPayload<Exchange>, String> discarded : discardedMessages.entrySet()) {
+                StreamPayload<Exchange> m = discarded.getKey();
+                m.getCallback().processed(m.getItem(), new IllegalStateException(discarded.getValue()));
+            }
         }
 
         checkAndFlush();