You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/01/20 21:46:29 UTC

qpid-jms git commit: QPIDJMS-244 Ensure async delivery allows acks before close

Repository: qpid-jms
Updated Branches:
  refs/heads/master 405341282 -> 6d1adfd22


QPIDJMS-244 Ensure async delivery allows acks before close

Ensure that the async message delivery is waited on when closing to
ensure that in auto ack mode the client acks the processed message
before the connection is closed.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6d1adfd2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6d1adfd2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6d1adfd2

Branch: refs/heads/master
Commit: 6d1adfd227dcbe3993cdc753ed202c53ac2a2f77
Parents: 4053412
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jan 20 16:46:04 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jan 20 16:46:04 2017 -0500

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java | 170 ++++++++++++-------
 .../java/org/apache/qpid/jms/JmsSession.java    |   8 +
 .../integration/ConsumerIntegrationTest.java    | 142 ++++++++++++++++
 3 files changed, 256 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6d1adfd2/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index b06270f..ff19388 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -64,6 +64,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     protected volatile JmsMessageAvailableListener availableListener;
     protected final MessageQueue messageQueue;
     protected final Lock lock = new ReentrantLock();
+    protected final Lock dispatchLock = new ReentrantLock();
     protected final AtomicBoolean suspendedConnection = new AtomicBoolean();
     protected final AtomicReference<Throwable> failureCause = new AtomicReference<>();
     protected final MessageDeliverTask deliveryTask = new MessageDeliverTask();
@@ -428,7 +429,6 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     }
 
     private JmsInboundMessageDispatch doAckConsumed(final JmsInboundMessageDispatch envelope) throws JMSException {
-        checkClosed();
         try {
             session.acknowledge(envelope, ACK_TYPE.ACCEPTED);
         } catch (JMSException ex) {
@@ -518,8 +518,10 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     public void start() {
         lock.lock();
         try {
-            this.messageQueue.start();
-            drainMessageQueueToListener();
+            if (!messageQueue.isRunning()) {
+                this.messageQueue.start();
+                drainMessageQueueToListener();
+            }
         } finally {
             lock.unlock();
         }
@@ -530,6 +532,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     }
 
     private void stop(boolean closeMessageQueue) {
+        dispatchLock.lock();
         lock.lock();
         try {
             if (closeMessageQueue) {
@@ -539,6 +542,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
             }
         } finally {
             lock.unlock();
+            dispatchLock.unlock();
         }
     }
 
@@ -562,12 +566,6 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         startConsumerResource();
     }
 
-    void drainMessageQueueToListener() {
-        if (messageListener != null && messageQueue.isRunning()) {
-            session.getDispatcherExecutor().execute(deliveryTask);
-        }
-    }
-
     /**
      * @return the id
      */
@@ -592,15 +590,19 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     public void setMessageListener(MessageListener listener) throws JMSException {
         checkClosed();
 
-        this.messageListener = listener;
-        if (listener != null) {
-            consumerInfo.setListener(true);
-            if (isPullConsumer()){
-                startConsumerResource();
+        dispatchLock.lock();
+        try {
+            messageListener = listener;
+            consumerInfo.setListener(listener != null);
+
+            if (listener != null) {
+                if (isPullConsumer()){
+                    startConsumerResource();
+                }
+                drainMessageQueueToListener();
             }
-            drainMessageQueueToListener();
-        } else {
-            consumerInfo.setListener(false);
+        } finally {
+            dispatchLock.unlock();
         }
     }
 
@@ -713,61 +715,101 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         return false;
     }
 
-    private final class MessageDeliverTask implements Runnable {
-        @Override
-        public void run() {
-            JmsInboundMessageDispatch envelope;
-            while (session.isStarted() && (envelope = messageQueue.dequeueNoWait()) != null) {
-                try {
-                    JmsMessage copy = null;
-
-                    if (consumeExpiredMessage(envelope)) {
-                        LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
-                        doAckExpired(envelope);
-                    } else if (redeliveryExceeded(envelope)) {
-                        LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
-                        doAckUndeliverable(envelope);
+    private void drainMessageQueueToListener() {
+        if (messageListener != null && session.isStarted() && messageQueue.isRunning()) {
+            session.getDispatcherExecutor().execute(new BoundedMessageDeliverTask(messageQueue.size()));
+        }
+    }
+
+    private boolean deliverNextPending() {
+        if (session.isStarted() && messageQueue.isRunning() && messageListener != null) {
+            dispatchLock.lock();
+            try {
+                JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait();
+                if (envelope == null) {
+                    return false;
+                }
+
+                JmsMessage copy = null;
+
+                if (consumeExpiredMessage(envelope)) {
+                    LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
+                    doAckExpired(envelope);
+                } else if (redeliveryExceeded(envelope)) {
+                    LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
+                    doAckUndeliverable(envelope);
+                } else {
+                    boolean deliveryFailed = false;
+                    boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
+                                              acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
+                    if (autoAckOrDupsOk) {
+                        copy = copy(doAckDelivered(envelope));
                     } else {
-                        boolean deliveryFailed = false;
-                        boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
-                                                  acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
-                        if (autoAckOrDupsOk) {
-                            copy = copy(doAckDelivered(envelope));
-                        } else {
-                            copy = copy(ackFromReceive(envelope));
-                        }
-                        session.clearSessionRecovered();
+                        copy = copy(ackFromReceive(envelope));
+                    }
+                    session.clearSessionRecovered();
 
-                        try {
-                            messageListener.onMessage(copy);
-                        } catch (RuntimeException rte) {
-                            deliveryFailed = true;
-                        }
+                    try {
+                        messageListener.onMessage(copy);
+                    } catch (RuntimeException rte) {
+                        deliveryFailed = true;
+                    }
 
-                        if (autoAckOrDupsOk && !session.isSessionRecovered()) {
-                            if (!deliveryFailed) {
-                                doAckConsumed(envelope);
-                            } else {
-                                doAckReleased(envelope);
-                            }
+                    if (autoAckOrDupsOk && !session.isSessionRecovered()) {
+                        if (!deliveryFailed) {
+                            doAckConsumed(envelope);
+                        } else {
+                            doAckReleased(envelope);
                         }
                     }
-                } catch (Exception e) {
-                    // TODO - There are two cases where we can get an error here, one being
-                    //        and error returned from the attempted ACK that was sent and the
-                    //        other being an error while attempting to copy the incoming message.
-                    //        We need to decide how to respond to these.
-                    session.getConnection().onException(e);
-                } finally {
-                    if (isPullConsumer()) {
-                        try {
-                            startConsumerResource();
-                        } catch (JMSException e) {
-                            LOG.error("Exception during credit replenishment for consumer listener {}", getConsumerId(), e);
-                        }
+                }
+            } catch (Exception e) {
+                // TODO - There are two cases where we can get an error here, one being
+                //        and error returned from the attempted ACK that was sent and the
+                //        other being an error while attempting to copy the incoming message.
+                //        We need to decide how to respond to these.
+                session.getConnection().onException(e);
+            } finally {
+                dispatchLock.unlock();
+
+                if (isPullConsumer()) {
+                    try {
+                        startConsumerResource();
+                    } catch (JMSException e) {
+                        LOG.error("Exception during credit replenishment for consumer listener {}", getConsumerId(), e);
                     }
                 }
             }
         }
+
+        return !messageQueue.isEmpty();
+    }
+
+    private final class BoundedMessageDeliverTask implements Runnable {
+
+        private final int deliveryCount;
+
+        public BoundedMessageDeliverTask(int deliveryCount) {
+            this.deliveryCount = deliveryCount;
+        }
+
+        @Override
+        public void run() {
+            int current = 0;
+
+            while (session.isStarted() && messageQueue.isRunning() && current++ < deliveryCount) {
+                if (!deliverNextPending()) {
+                    return;  // Another task already drained the queue.
+                }
+            }
+        }
+    }
+
+    private final class MessageDeliverTask implements Runnable {
+
+        @Override
+        public void run() {
+            deliverNextPending();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6d1adfd2/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 49729ec..3b34674 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -185,8 +185,15 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             throw new javax.jms.IllegalStateException("Cannot call recover() on a transacted session");
         }
 
+        boolean wasStarted = isStarted();
+        stop();
+
         connection.recover(getSessionId());
         sessionRecovered = true;
+
+        if (wasStarted) {
+            start();
+        }
     }
 
     @Override
@@ -1184,6 +1191,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return transactionContext;
     }
 
+
     boolean isSessionRecovered() {
         return sessionRecovered;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6d1adfd2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 59eec0a..d1cab10 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -1162,4 +1162,146 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testConsumerCloseWaitsForAsyncDeliveryToComplete() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    latch.countDown();
+
+                    LOG.debug("Async consumer got Message: {}", m);
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(100);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            });
+
+            boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+            assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testSessionCloseWaitsForAsyncDeliveryToComplete() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    latch.countDown();
+
+                    LOG.debug("Async consumer got Message: {}", m);
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(100);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            });
+
+            boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+            assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
+
+            testPeer.expectEnd();
+            session.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testConnectionCloseWaitsForAsyncDeliveryToComplete() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    latch.countDown();
+
+                    LOG.debug("Async consumer got Message: {}", m);
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(100);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            });
+
+            boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+            assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org