You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/04/24 18:05:21 UTC
qpid-jms git commit: QPIDJMS-380 Fix race between session delivery
and consumer close
Repository: qpid-jms
Updated Branches:
refs/heads/master 4b97438f5 -> 6c3734267
QPIDJMS-380 Fix race between session delivery and consumer close
Prevent connection consumer from closing while a Session is still in the
process of delivering and acknowledging a message to avoid error when
the session finally acknowledges it.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6c373426
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6c373426
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6c373426
Branch: refs/heads/master
Commit: 6c3734267cc24f51f39a8283cf56ee559bcaa90a
Parents: 4b97438
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Apr 24 14:05:06 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Apr 24 14:05:06 2018 -0400
----------------------------------------------------------------------
.../apache/qpid/jms/JmsConnectionConsumer.java | 77 +++++++++++++++++++-
.../java/org/apache/qpid/jms/JmsSession.java | 57 +++------------
.../ConnectionConsumerIntegrationTest.java | 60 ++++++++++++++-
3 files changed, 140 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6c373426/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
index 030a0b6..f336676 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
@@ -16,6 +16,8 @@
*/
package org.apache.qpid.jms;
+import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition;
+
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -23,7 +25,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
import javax.jms.ConnectionConsumer;
import javax.jms.IllegalStateException;
@@ -33,8 +38,11 @@ import javax.jms.ServerSessionPool;
import javax.jms.Session;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.meta.JmsResource.ResourceState;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.util.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +63,7 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp
private final Lock stateLock = new ReentrantLock();
private final Lock dispatchLock = new ReentrantLock();
+ private final ReadWriteLock deliveringLock = new ReentrantReadWriteLock(true);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
private final ScheduledThreadPoolExecutor dispatcher;
@@ -131,8 +140,13 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp
* @throws JMSException if an error occurs during the consumer close operation.
*/
protected void doClose() throws JMSException {
- shutdown();
- this.connection.destroyResource(consumerInfo);
+ deliveringLock.writeLock().lock();
+ try {
+ shutdown();
+ this.connection.destroyResource(consumerInfo);
+ } finally {
+ deliveringLock.writeLock().unlock();
+ }
}
protected void shutdown() throws JMSException {
@@ -250,7 +264,7 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp
JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait();
if (session instanceof JmsSession) {
- ((JmsSession) session).enqueueInSession(envelope);
+ ((JmsSession) session).enqueueInSession(new DeliveryTask(envelope));
} else {
LOG.warn("ServerSession provided an unknown JMS Session type to this ConnectionConsumer: {}", session);
}
@@ -286,4 +300,59 @@ public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDisp
}
}
}
-}
\ No newline at end of file
+
+ private final class DeliveryTask implements Consumer<JmsSession> {
+
+ private final JmsInboundMessageDispatch envelope;
+
+ public DeliveryTask(JmsInboundMessageDispatch envelope) {
+ this.envelope = envelope;
+ }
+
+ @Override
+ public void accept(JmsSession session) {
+ deliveringLock.readLock().lock();
+
+ try {
+ if (closed.get()) {
+ return; // Message has been released.
+ }
+
+ JmsMessage copy = null;
+
+ if (envelope.getMessage().isExpired()) {
+ LOG.trace("{} filtered expired message: {}", envelope.getConsumerId(), envelope);
+ session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
+ } else if (session.redeliveryExceeded(envelope)) {
+ LOG.trace("{} filtered message with excessive redelivery count: {}", envelope.getConsumerId(), envelope);
+ JmsRedeliveryPolicy redeliveryPolicy = envelope.getConsumerInfo().getRedeliveryPolicy();
+ session.acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(envelope.getConsumerInfo().getDestination())));
+ } else {
+ boolean deliveryFailed = false;
+
+ copy = session.acknowledge(envelope, ACK_TYPE.DELIVERED).getMessage().copy();
+
+ session.clearSessionRecovered();
+
+ try {
+ session.getMessageListener().onMessage(copy);
+ } catch (RuntimeException rte) {
+ deliveryFailed = true;
+ }
+
+ if (!session.isSessionRecovered()) {
+ if (!deliveryFailed) {
+ session.acknowledge(envelope, ACK_TYPE.ACCEPTED);
+ } else {
+ session.acknowledge(envelope, ACK_TYPE.RELEASED);
+ }
+ }
+ }
+ } catch (Exception e) {
+ getConnection().onAsyncException(e);
+ } finally {
+ deliveringLock.readLock().unlock();
+ }
+ }
+ }
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6c373426/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 6152991..77df75c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -16,11 +16,10 @@
*/
package org.apache.qpid.jms;
-import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition;
-
import java.io.Serializable;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
@@ -38,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import javax.jms.BytesMessage;
import javax.jms.CompletionListener;
@@ -94,8 +94,6 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.selector.SelectorParser;
import org.apache.qpid.jms.selector.filter.FilterException;
-import org.apache.qpid.jms.util.FifoMessageQueue;
-import org.apache.qpid.jms.util.MessageQueue;
import org.apache.qpid.jms.util.NoOpExecutor;
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
import org.slf4j.Logger;
@@ -117,7 +115,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private final Map<JmsProducerId, JmsMessageProducer> producers = new ConcurrentHashMap<JmsProducerId, JmsMessageProducer>();
private final Map<JmsConsumerId, JmsMessageConsumer> consumers = new ConcurrentHashMap<JmsConsumerId, JmsMessageConsumer>();
private MessageListener messageListener;
- private final MessageQueue sessionQueue = new FifoMessageQueue(16);
+
+ private final java.util.Queue<Consumer<JmsSession>> sessionQueue = new ArrayDeque<>();
+
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean started = new AtomicBoolean();
private final JmsSessionInfo sessionInfo;
@@ -728,42 +728,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
throw new RuntimeException(ex);
}
- JmsInboundMessageDispatch envelope = null;
- while ((envelope = sessionQueue.dequeueNoWait()) != null) {
- try {
- JmsMessage copy = null;
-
- if (envelope.getMessage().isExpired()) {
- LOG.trace("{} filtered expired message: {}", envelope.getConsumerId(), envelope);
- acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
- } else if (redeliveryExceeded(envelope)) {
- LOG.trace("{} filtered message with excessive redelivery count: {}", envelope.getConsumerId(), envelope);
- JmsRedeliveryPolicy redeliveryPolicy = envelope.getConsumerInfo().getRedeliveryPolicy();
- acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(envelope.getConsumerInfo().getDestination())));
- } else {
- boolean deliveryFailed = false;
-
- copy = acknowledge(envelope, ACK_TYPE.DELIVERED).getMessage().copy();
-
- clearSessionRecovered();
-
- try {
- messageListener.onMessage(copy);
- } catch (RuntimeException rte) {
- deliveryFailed = true;
- }
-
- if (!isSessionRecovered()) {
- if (!deliveryFailed) {
- acknowledge(envelope, ACK_TYPE.ACCEPTED);
- } else {
- acknowledge(envelope, ACK_TYPE.RELEASED);
- }
- }
- }
- } catch (Exception e) {
- getConnection().onException(e);
- }
+ Consumer<JmsSession> dispatcher = null;
+ while ((dispatcher = sessionQueue.poll()) != null) {
+ dispatcher.accept(this);
}
}
@@ -1108,8 +1075,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
for (JmsMessageConsumer consumer : consumers.values()) {
consumer.start();
}
-
- sessionQueue.start();
}
}
@@ -1120,8 +1085,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
consumer.stop();
}
- sessionQueue.stop();
-
synchronized (sessionInfo) {
if (deliveryExecutor != null) {
deliveryExecutor.shutdown();
@@ -1423,8 +1386,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
}
- void enqueueInSession(JmsInboundMessageDispatch envelope) {
- sessionQueue.enqueue(envelope);
+ void enqueueInSession(Consumer<JmsSession> dispatcher) {
+ sessionQueue.add(dispatcher);
}
//----- Asynchronous Send Helpers ----------------------------------------//
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6c373426/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
index f92357d..30885b8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
@@ -133,7 +133,63 @@ public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase {
}
assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS));
- testPeer.waitForAllHandlersToComplete(2000);
+
+ testPeer.expectDetach(true, true, true);
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testPauseInOnMessageAndConsumerClosed() throws Exception {
+ final CountDownLatch messageArrived = new CountDownLatch(1);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ testPeer.expectBegin();
+
+ // Create a session for our ServerSessionPool to use
+ Session session = connection.createSession();
+ session.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ messageArrived.countDown();
+
+ LOG.trace("Pausing onMessage to check for race on connection consumer close");
+
+ // Pause a bit to see if we race consumer close and our own
+ // message accept attempt by the delivering Session.
+ try {
+ TimeUnit.MILLISECONDS.sleep(10);
+ } catch (InterruptedException e) {
+ }
+
+ LOG.trace("Paused onMessage to check for race on connection consumer close");
+ }
+ });
+ JmsServerSession serverSession = new JmsServerSession(session);
+ JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
+
+ // Now the Connection consumer arrives and we give it a message
+ // to be dispatched to the server session.
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ Queue queue = new JmsQueue("myQueue");
+ ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+ connection.start();
+
+ assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
consumer.close();
@@ -240,7 +296,6 @@ public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase {
connection.start();
assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS));
- testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectDetach(true, true, true);
consumer.close();
@@ -303,7 +358,6 @@ public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase {
connection.start();
assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS));
- testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectDetach(true, true, true);
consumer.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org