You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/02/03 17:42:09 UTC

[2/3] camel git commit: CAMEL-10612: fix delayed publisher issue on late request

CAMEL-10612: fix delayed publisher issue on late request


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8195f3ed
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8195f3ed
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8195f3ed

Branch: refs/heads/master
Commit: 8195f3ed512d48684482b90900b449a1f40f56ce
Parents: dd25dcc
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Feb 3 17:14:11 2017 +0100
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Fri Feb 3 18:41:41 2017 +0100

----------------------------------------------------------------------
 .../streams/engine/DelayedMonoPublisher.java    |  8 ++++--
 .../streams/DelayedMonoPublisherTest.java       | 28 ++++++++++++++++++++
 2 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8195f3ed/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
index ef8ece1..106ab4f 100644
--- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
+++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java
@@ -138,8 +138,11 @@ public class DelayedMonoPublisher<T> implements Publisher<T> {
 
         @Override
         public void request(long l) {
-            if (terminated) {
-                throw new IllegalStateException("The subscription is terminated");
+            synchronized (this) {
+                if (terminated) {
+                    // just ignore the request
+                    return;
+                }
             }
 
             if (l <= 0) {
@@ -153,6 +156,7 @@ public class DelayedMonoPublisher<T> implements Publisher<T> {
                 }
             }
 
+            flushCycle();
         }
 
         public void flush() {

http://git-wip-us.apache.org/repos/asf/camel/blob/8195f3ed/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
----------------------------------------------------------------------
diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
index 011225e..9361473 100644
--- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
+++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DelayedMonoPublisherTest.java
@@ -17,15 +17,18 @@
 package org.apache.camel.component.reactive.streams;
 
 import java.util.LinkedList;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import io.reactivex.Flowable;
 
 import org.apache.camel.component.reactive.streams.engine.DelayedMonoPublisher;
+import org.apache.camel.component.reactive.streams.support.TestSubscriber;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -224,6 +227,31 @@ public class DelayedMonoPublisherTest {
         }
     }
 
+    @Test
+    public void testDelayedRequest() throws Exception {
+
+        DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);
+        pub.setData(2);
+
+        BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
+
+        TestSubscriber<Integer> sub = new TestSubscriber<Integer>() {
+            @Override
+            public void onNext(Integer o) {
+                queue.add(o);
+            }
+        };
+        sub.setInitiallyRequested(0);
+
+        pub.subscribe(sub);
+
+        Thread.sleep(100);
+        sub.request(1);
+
+        Integer res = queue.poll(1, TimeUnit.SECONDS);
+        assertEquals(new Integer(2), res);
+    }
+
     @Test(expected = IllegalStateException.class)
     public void testDataOrExceptionAllowed() throws Exception {
         DelayedMonoPublisher<Integer> pub = new DelayedMonoPublisher<>(service);