You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/09 12:32:24 UTC

camel git commit: camel-reactive-streams: Fix latest strategy which was mistakenly acting as oldest.

Repository: camel
Updated Branches:
  refs/heads/master 872082312 -> c3b6595f9


camel-reactive-streams: Fix latest strategy which was  mistakenly acting as oldest.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c3b6595f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c3b6595f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c3b6595f

Branch: refs/heads/master
Commit: c3b6595f94d82cb414e50752b5555b5ab2b7970c
Parents: 8720823
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 9 14:32:12 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 9 14:32:12 2017 +0200

----------------------------------------------------------------------
 .../ReactiveStreamsBackpressureStrategy.java    | 23 ++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c3b6595f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
index db46915..fe23866 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.reactive.streams;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
@@ -44,7 +43,7 @@ public enum ReactiveStreamsBackpressureStrategy {
         @Override
         public <T> Collection<T> update(Deque<T> buffer, T element) {
             if (buffer.size() > 0) {
-                return Arrays.asList(element);
+                return Collections.singletonList(element);
             } else {
                 buffer.addLast(element);
                 return Collections.emptySet();
@@ -61,14 +60,30 @@ public enum ReactiveStreamsBackpressureStrategy {
         public <T> Collection<T> update(Deque<T> buffer, T element) {
             Collection<T> discarded = Collections.emptySet();
             if (buffer.size() > 0) {
-                discarded = Arrays.asList(buffer.removeFirst());
+                discarded = Collections.singletonList(buffer.removeLast());
             }
 
             buffer.addLast(element);
             return discarded;
         }
-    };
+    },
 
+    /**
+     * Keeps only the oldest onNext value, overwriting any previous value if the
+     * downstream can't keep up.
+     */
+    OLDEST {
+        @Override
+        public <T> Collection<T> update(Deque<T> buffer, T element) {
+            Collection<T> discarded = Collections.emptySet();
+            if (buffer.size() > 0) {
+                discarded = Collections.singletonList(buffer.removeFirst());
+            }
+
+            buffer.addLast(element);
+            return discarded;
+        }
+    };
 
     /**
      * Updates the buffer and returns a list of discarded elements (if any).