You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/02/17 19:58:08 UTC

qpid-jms git commit: QPIDJMS-266 Fix race on connection start with inbound messages

Repository: qpid-jms
Updated Branches:
  refs/heads/master f3b9cd01f -> 1b8f24655


QPIDJMS-266 Fix race on connection start with inbound messages

If a connection is started in which consumer have been created and
messages are incoming a call to start can end in a race where on
incoming message can jump the line and be delivered ahead of previously
queued messages.  Stop storing messages in the session stopped messages
queue and just pass them along to the consumer, it won't dispatch them
until started again.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1b8f2465
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1b8f2465
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1b8f2465

Branch: refs/heads/master
Commit: 1b8f246553816947f8bab66b522ec6f5b84707b2
Parents: f3b9cd0
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Feb 17 14:57:42 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Feb 17 14:57:42 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    | 13 +---
 .../transactions/JmsTransactedConsumerTest.java | 72 ++++++++++++--------
 2 files changed, 46 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b8f2465/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 3b34674..ca3bb26 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -105,8 +104,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private MessageListener messageListener;
     private final AtomicBoolean closed = new AtomicBoolean();
     private final AtomicBoolean started = new AtomicBoolean();
-    private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages =
-        new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
     private final JmsSessionInfo sessionInfo;
     private final ReentrantLock sendLock = new ReentrantLock();
     private volatile ExecutorService deliveryExecutor;
@@ -996,10 +993,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
     protected void start() throws JMSException {
         if (started.compareAndSet(false, true)) {
-            JmsInboundMessageDispatch message = null;
-            while ((message = this.stoppedMessages.poll()) != null) {
-                deliver(message);
-            }
             for (JmsMessageConsumer consumer : consumers.values()) {
                 consumer.start();
             }
@@ -1204,11 +1197,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
     @Override
     public void onInboundMessage(JmsInboundMessageDispatch envelope) {
-        if (started.get()) {
-            deliver(envelope);
-        } else {
-            stoppedMessages.add(envelope);
-        }
+        deliver(envelope);
     }
 
     protected void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b8f2465/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
index cde7656..5188a52 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms.transactions;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.net.URI;
 import java.util.ArrayList;
@@ -26,6 +27,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.Message;
@@ -539,34 +541,50 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport {
     @Test(timeout = 90000)
     public void testConsumerMessagesInOrder() throws Exception {
 
-        for (int i = 0; i < 5; ++i) {
-
-            connection = createAmqpConnection();
-            connection.start();
-
-            final int MESSAGE_COUNT = 20;
-
-            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue(name.getMethodName());
-
-            sendToAmqQueue(MESSAGE_COUNT);
-
-            MessageConsumer consumer = session.createConsumer(queue);
-
-            for (int j = 0; j < MESSAGE_COUNT; ++j) {
-                Message message = consumer.receive(5000);
-                assertNotNull(message);
-                assertEquals(j + 1, message.getIntProperty(MESSAGE_NUMBER));
+        final int ITERATIONS = 5;
+        final int MESSAGE_COUNT = 4;
+
+        for (int i = 0; i < ITERATIONS; i++) {
+            LOG.debug("##----------- Iteration {} -----------##", i);
+            Connection consumingConnection = createAmqpConnection();
+            Connection producingConnection = createAmqpConnection();
+
+            Session consumerSession = consumingConnection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = consumerSession.createQueue("my_queue");
+            MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+            Session producerSession = producingConnection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageProducer producer = producerSession.createProducer(queue);
+
+            ArrayList<TextMessage> received = new ArrayList<>();
+
+            try {
+                for (int j = 0; j < MESSAGE_COUNT; j++) {
+                    producer.send(producerSession.createTextMessage("msg" + (j + 1)));
+                }
+
+                producerSession.commit();
+
+                consumingConnection.start();
+
+                for (int k = 0; k < MESSAGE_COUNT; k++) {
+                    TextMessage tm = (TextMessage) consumer.receive(5000);
+                    assertNotNull(tm);
+                    received.add(tm);
+                }
+
+                for (int l = 0; l < MESSAGE_COUNT; l++) {
+                    TextMessage tm = received.get(l);
+                    assertNotNull(tm);
+                    if (!("msg" + (l + 1)).equals(tm.getText())) {
+                        fail("Out of order, expected " + ("msg" + (l + 1)) + " but got: " + tm.getText());
+                    }
+                }
+                consumerSession.commit();
+            } finally {
+                consumingConnection.close();
+                producingConnection.close();
             }
-
-            session.close();
-
-            QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
-            proxy.purge();
-
-            assertEquals(0, proxy.getQueueSize());
-
-            consumer.close();
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org