You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/09 12:32:24 UTC
camel git commit: camel-reactive-streams: Fix latest strategy which
was mistakenly acting as oldest.
Repository: camel
Updated Branches:
refs/heads/master 872082312 -> c3b6595f9
camel-reactive-streams: Fix latest strategy which was mistakenly acting as oldest.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c3b6595f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c3b6595f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c3b6595f
Branch: refs/heads/master
Commit: c3b6595f94d82cb414e50752b5555b5ab2b7970c
Parents: 8720823
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Apr 9 14:32:12 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Apr 9 14:32:12 2017 +0200
----------------------------------------------------------------------
.../ReactiveStreamsBackpressureStrategy.java | 23 ++++++++++++++++----
1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c3b6595f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
index db46915..fe23866 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsBackpressureStrategy.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.reactive.streams;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
@@ -44,7 +43,7 @@ public enum ReactiveStreamsBackpressureStrategy {
@Override
public <T> Collection<T> update(Deque<T> buffer, T element) {
if (buffer.size() > 0) {
- return Arrays.asList(element);
+ return Collections.singletonList(element);
} else {
buffer.addLast(element);
return Collections.emptySet();
@@ -61,14 +60,30 @@ public enum ReactiveStreamsBackpressureStrategy {
public <T> Collection<T> update(Deque<T> buffer, T element) {
Collection<T> discarded = Collections.emptySet();
if (buffer.size() > 0) {
- discarded = Arrays.asList(buffer.removeFirst());
+ discarded = Collections.singletonList(buffer.removeLast());
}
buffer.addLast(element);
return discarded;
}
- };
+ },
+ /**
+ * Keeps only the oldest onNext value, overwriting any previous value if the
+ * downstream can't keep up.
+ */
+ OLDEST {
+ @Override
+ public <T> Collection<T> update(Deque<T> buffer, T element) {
+ Collection<T> discarded = Collections.emptySet();
+ if (buffer.size() > 0) {
+ discarded = Collections.singletonList(buffer.removeFirst());
+ }
+
+ buffer.addLast(element);
+ return discarded;
+ }
+ };
/**
* Updates the buffer and returns a list of discarded elements (if any).