You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/04/24 18:05:21 UTC

qpid-jms git commit: QPIDJMS-380 Fix race between session delivery and consumer close

Repository: qpid-jms
Updated Branches:
  refs/heads/master 4b97438f5 -> 6c3734267


QPIDJMS-380 Fix race between session delivery and consumer close

Prevent connection consumer from closing while a Session is still in the
process of delivering and acknowledging a message to avoid error when
the session finally acknowledges it.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6c373426
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6c373426
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6c373426

Branch: refs/heads/master
Commit: 6c3734267cc24f51f39a8283cf56ee559bcaa90a
Parents: 4b97438
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Apr 24 14:05:06 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Apr 24 14:05:06 2018 -0400

----------------------------------------------------------------------
 .../apache/qpid/jms/JmsConnectionConsumer.java  | 77 +++++++++++++++++++-
 .../java/org/apache/qpid/jms/JmsSession.java    | 57 +++------------
 .../ConnectionConsumerIntegrationTest.java      | 60 ++++++++++++++-
 3 files changed, 140 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6c373426/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
index 030a0b6..f336676 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.jms;
 
+import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition;
+
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -23,7 +25,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 
 import javax.jms.ConnectionConsumer;
 import javax.jms.IllegalStateException;
@@ -33,8 +38,11 @@ import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
 import org.apache.qpid.jms.meta.JmsResource.ResourceState;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.util.MessageQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +63,7 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp
 
     private final Lock stateLock = new ReentrantLock();
     private final Lock dispatchLock = new ReentrantLock();
+    private final ReadWriteLock deliveringLock = new ReentrantReadWriteLock(true);
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
     private final ScheduledThreadPoolExecutor dispatcher;
@@ -131,8 +140,13 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp
      * @throws JMSException if an error occurs during the consumer close operation.
      */
     protected void doClose() throws JMSException {
-        shutdown();
-        this.connection.destroyResource(consumerInfo);
+        deliveringLock.writeLock().lock();
+        try {
+            shutdown();
+            this.connection.destroyResource(consumerInfo);
+        } finally {
+            deliveringLock.writeLock().unlock();
+        }
     }
 
     protected void shutdown() throws JMSException {
@@ -250,7 +264,7 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp
                 JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait();
 
                 if (session instanceof JmsSession) {
-                    ((JmsSession) session).enqueueInSession(envelope);
+                    ((JmsSession) session).enqueueInSession(new DeliveryTask(envelope));
                 } else {
                     LOG.warn("ServerSession provided an unknown JMS Session type to this ConnectionConsumer: {}", session);
                 }
@@ -286,4 +300,59 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp
             }
         }
     }
-}
\ No newline at end of file
+
+    private final class DeliveryTask implements Consumer<JmsSession> {
+
+        private final JmsInboundMessageDispatch envelope;
+
+        public DeliveryTask(JmsInboundMessageDispatch envelope) {
+            this.envelope = envelope;
+        }
+
+        @Override
+        public void accept(JmsSession session) {
+            deliveringLock.readLock().lock();
+
+            try {
+                if (closed.get()) {
+                    return;  // Message has been released.
+                }
+
+                JmsMessage copy = null;
+
+                if (envelope.getMessage().isExpired()) {
+                    LOG.trace("{} filtered expired message: {}", envelope.getConsumerId(), envelope);
+                    session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
+                } else if (session.redeliveryExceeded(envelope)) {
+                    LOG.trace("{} filtered message with excessive redelivery count: {}", envelope.getConsumerId(), envelope);
+                    JmsRedeliveryPolicy redeliveryPolicy = envelope.getConsumerInfo().getRedeliveryPolicy();
+                    session.acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(envelope.getConsumerInfo().getDestination())));
+                } else {
+                    boolean deliveryFailed = false;
+
+                    copy = session.acknowledge(envelope, ACK_TYPE.DELIVERED).getMessage().copy();
+
+                    session.clearSessionRecovered();
+
+                    try {
+                        session.getMessageListener().onMessage(copy);
+                    } catch (RuntimeException rte) {
+                        deliveryFailed = true;
+                    }
+
+                    if (!session.isSessionRecovered()) {
+                        if (!deliveryFailed) {
+                            session.acknowledge(envelope, ACK_TYPE.ACCEPTED);
+                        } else {
+                            session.acknowledge(envelope, ACK_TYPE.RELEASED);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                getConnection().onAsyncException(e);
+            } finally {
+                deliveringLock.readLock().unlock();
+            }
+        }
+    }
+ }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6c373426/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 6152991..77df75c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -16,11 +16,10 @@
  */
 package org.apache.qpid.jms;
 
-import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition;
-
 import java.io.Serializable;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.Iterator;
@@ -38,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 
 import javax.jms.BytesMessage;
 import javax.jms.CompletionListener;
@@ -94,8 +94,6 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.selector.SelectorParser;
 import org.apache.qpid.jms.selector.filter.FilterException;
-import org.apache.qpid.jms.util.FifoMessageQueue;
-import org.apache.qpid.jms.util.MessageQueue;
 import org.apache.qpid.jms.util.NoOpExecutor;
 import org.apache.qpid.jms.util.QpidJMSThreadFactory;
 import org.slf4j.Logger;
@@ -117,7 +115,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private final Map<JmsProducerId, JmsMessageProducer> producers = new ConcurrentHashMap<JmsProducerId, JmsMessageProducer>();
     private final Map<JmsConsumerId, JmsMessageConsumer> consumers = new ConcurrentHashMap<JmsConsumerId, JmsMessageConsumer>();
     private MessageListener messageListener;
-    private final MessageQueue sessionQueue = new FifoMessageQueue(16);
+
+    private final java.util.Queue<Consumer<JmsSession>> sessionQueue = new ArrayDeque<>();
+
     private final AtomicBoolean closed = new AtomicBoolean();
     private final AtomicBoolean started = new AtomicBoolean();
     private final JmsSessionInfo sessionInfo;
@@ -728,42 +728,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             throw new RuntimeException(ex);
         }
 
-        JmsInboundMessageDispatch envelope = null;
-        while ((envelope = sessionQueue.dequeueNoWait()) != null) {
-            try {
-                JmsMessage copy = null;
-
-                if (envelope.getMessage().isExpired()) {
-                    LOG.trace("{} filtered expired message: {}", envelope.getConsumerId(), envelope);
-                    acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
-                } else if (redeliveryExceeded(envelope)) {
-                    LOG.trace("{} filtered message with excessive redelivery count: {}", envelope.getConsumerId(), envelope);
-                    JmsRedeliveryPolicy redeliveryPolicy = envelope.getConsumerInfo().getRedeliveryPolicy();
-                    acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(envelope.getConsumerInfo().getDestination())));
-                } else {
-                    boolean deliveryFailed = false;
-
-                    copy = acknowledge(envelope, ACK_TYPE.DELIVERED).getMessage().copy();
-
-                    clearSessionRecovered();
-
-                    try {
-                        messageListener.onMessage(copy);
-                    } catch (RuntimeException rte) {
-                        deliveryFailed = true;
-                    }
-
-                    if (!isSessionRecovered()) {
-                        if (!deliveryFailed) {
-                            acknowledge(envelope, ACK_TYPE.ACCEPTED);
-                        } else {
-                            acknowledge(envelope, ACK_TYPE.RELEASED);
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                getConnection().onException(e);
-            }
+        Consumer<JmsSession> dispatcher = null;
+        while ((dispatcher = sessionQueue.poll()) != null) {
+            dispatcher.accept(this);
         }
     }
 
@@ -1108,8 +1075,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             for (JmsMessageConsumer consumer : consumers.values()) {
                 consumer.start();
             }
-
-            sessionQueue.start();
         }
     }
 
@@ -1120,8 +1085,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             consumer.stop();
         }
 
-        sessionQueue.stop();
-
         synchronized (sessionInfo) {
             if (deliveryExecutor != null) {
                 deliveryExecutor.shutdown();
@@ -1423,8 +1386,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
     }
 
-    void enqueueInSession(JmsInboundMessageDispatch envelope) {
-        sessionQueue.enqueue(envelope);
+    void enqueueInSession(Consumer<JmsSession> dispatcher) {
+        sessionQueue.add(dispatcher);
     }
 
     //----- Asynchronous Send Helpers ----------------------------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6c373426/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
index f92357d..30885b8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
@@ -133,7 +133,63 @@ public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase {
             }
 
             assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS));
-            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testPauseInOnMessageAndConsumerClosed() throws Exception {
+        final CountDownLatch messageArrived = new CountDownLatch(1);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            testPeer.expectBegin();
+
+            // Create a session for our ServerSessionPool to use
+            Session session = connection.createSession();
+            session.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    messageArrived.countDown();
+
+                    LOG.trace("Pausing onMessage to check for race on connection consumer close");
+
+                    // Pause a bit to see if we race consumer close and our own
+                    // message accept attempt by the delivering Session.
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(10);
+                    } catch (InterruptedException e) {
+                    }
+
+                    LOG.trace("Paused onMessage to check for race on connection consumer close");
+                }
+            });
+            JmsServerSession serverSession = new JmsServerSession(session);
+            JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
+
+            // Now the Connection consumer arrives and we give it a message
+            // to be dispatched to the server session.
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            Queue queue = new JmsQueue("myQueue");
+            ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+            connection.start();
+
+            assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS));
 
             testPeer.expectDetach(true, true, true);
             consumer.close();
@@ -240,7 +296,6 @@ public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS));
-            testPeer.waitForAllHandlersToComplete(2000);
 
             testPeer.expectDetach(true, true, true);
             consumer.close();
@@ -303,7 +358,6 @@ public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase {
             connection.start();
 
             assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS));
-            testPeer.waitForAllHandlersToComplete(2000);
 
             testPeer.expectDetach(true, true, true);
             consumer.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org