You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/11/11 22:30:36 UTC

qpid-jms git commit: QPIDJMS-221 Ensure pending consumer requests are unblocked on close

Repository: qpid-jms
Updated Branches:
  refs/heads/master 952de60ae -> 2a201375f


QPIDJMS-221 Ensure pending consumer requests are unblocked on close

When the consumer becomes closed or it's parent session or connection
does it should unblock any pending drain or stop requests.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2a201375
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2a201375
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2a201375

Branch: refs/heads/master
Commit: 2a201375ffeea39da43db811eb57bc293fb8d4b4
Parents: 952de60
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Nov 11 17:30:27 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Nov 11 17:30:27 2016 -0500

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 12 +++
 .../ZeroPrefetchIntegrationTest.java            | 85 ++++++++++++++++++++
 2 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2a201375/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 30e1cef..02d9b47 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -572,6 +572,18 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         JmsConsumerInfo consumerInfo = getResourceInfo();
 
         subTracker.consumerRemoved(consumerInfo);
+
+        // When closed we need to release any pending tasks to avoid blocking
+
+        if (stopRequest != null) {
+            stopRequest.onSuccess();
+            stopRequest = null;
+        }
+
+        if (pullRequest != null) {
+            pullRequest.onSuccess();
+            pullRequest = null;
+        }
     }
 
     //----- Inner classes used in message pull operations --------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2a201375/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
index 29a1d46..24e133b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -25,7 +25,10 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.Date;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -89,6 +92,9 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
             assertTrue(m instanceof TextMessage);
             assertEquals("Unexpected message content", liveMsgContent, ((TextMessage) m).getText());
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
@@ -123,6 +129,9 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
             assertTrue(m instanceof TextMessage);
             assertEquals("Unexpected message content", msgContent, ((TextMessage) m).getText());
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
@@ -187,4 +196,80 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout=40000)
+    public void testZeroPrefetchConsumerReceiveUnblockedOnSessionClose() throws Exception {
+        doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(0);
+    }
+
+    @Test(timeout=40000)
+    public void testZeroPrefetchConsumerReceiveTimedUnblockedOnSessionClose() throws Exception {
+        doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(1);
+    }
+
+    @Test(timeout=40000)
+    public void testZeroPrefetchConsumerReceiveNoWaitUnblockedOnSessionClose() throws Exception {
+        doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(-1);
+    }
+
+    public void doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(final int timeout) throws Exception {
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Create a connection with zero prefetch
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the consumer to attach but NOT send credit
+            testPeer.expectReceiverAttach();
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            // Expect that once receive is called, it drains with 1 credit, don't answer it
+            if (timeout < 0) {
+                testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.ONE));
+            } else {
+                testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.ONE));
+            }
+
+            final AtomicBoolean error = new AtomicBoolean(false);
+            final CountDownLatch done = new CountDownLatch(1);
+
+            ExecutorService executor = Executors.newSingleThreadExecutor();
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        if (timeout < 0) {
+                            consumer.receiveNoWait();
+                        } else if (timeout == 0) {
+                            consumer.receive();
+                        } else {
+                            consumer.receive(10000);
+                        }
+                    } catch (Exception ex) {
+                        error.set(true);
+                    } finally {
+                        done.countDown();
+                    }
+                }
+            });
+
+            testPeer.waitForAllHandlersToComplete(3000);
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            session.close();
+
+            assertTrue("Consumer did not unblock", done.await(10, TimeUnit.SECONDS));
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org