You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/02/03 17:42:09 UTC
[2/3] camel git commit: CAMEL-10612: fix delayed publisher issue on
late request
CAMEL-10612: fix delayed publisher issue on late request
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8195f3ed
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8195f3ed
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8195f3ed
Branch: refs/heads/master
Commit: 8195f3ed512d48684482b90900b449a1f40f56ce
Parents: dd25dcc
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Feb 3 17:14:11 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Fri Feb 3 18:41:41 2017 +0100
----------------------------------------------------------------------
.../streams/engine/DelayedMonoPublisher.java | 8 ++++--
.../streams/DelayedMonoPublisherTest.java | 28 ++++++++++++++++++++
2 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8195f3ed/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
index ef8ece1..106ab4f 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
@@ -138,8 +138,11 @@ public class DelayedMonoPublisher<T> implements Publisher<T> {
@Override
public void request(long l) {
- if (terminated) {
- throw new IllegalStateException("The subscription is terminated");
+ synchronized (this) {
+ if (terminated) {
+ // just ignore the request
+ return;
+ }
}
if (l <= 0) {
@@ -153,6 +156,7 @@ public class DelayedMonoPublisher<T> implements Publisher<T> {
}
}
+ flushCycle();
}
public void flush() {
http://git-wip-us.apache.org/repos/asf/camel/blob/8195f3ed/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
index 011225e..9361473 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
@@ -17,15 +17,18 @@
package org.apache.camel.component.reactive.streams;
import java.util.LinkedList;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import io.reactivex.Flowable;
import org.apache.camel.component.reactive.streams.engine.DelayedMonoPublisher;
+import org.apache.camel.component.reactive.streams.support.TestSubscriber;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -224,6 +227,31 @@ public class DelayedMonoPublisherTest {
}
}
+ @Test
+ public void testDelayedRequest() throws Exception {
+
+ DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+ pub.setData(2);
+
+ BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
+
+ TestSubscriber<Integer> sub = new TestSubscriber<Integer>() {
+ @Override
+ public void onNext(Integer o) {
+ queue.add(o);
+ }
+ };
+ sub.setInitiallyRequested(0);
+
+ pub.subscribe(sub);
+
+ Thread.sleep(100);
+ sub.request(1);
+
+ Integer res = queue.poll(1, TimeUnit.SECONDS);
+ assertEquals(new Integer(2), res);
+ }
+
@Test(expected = IllegalStateException.class)
public void testDataOrExceptionAllowed() throws Exception {
DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);