You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/08/26 20:22:29 UTC

[2/2] qpid-jms git commit: QPIDJMS-102: synchronize creation of the executor to avoid concurrent creation and subsequent issues with initial async deliveries

QPIDJMS-102: synchronize creation of the executor to avoid concurrent creation and subsequent issues with initial async deliveries


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/27c72421
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/27c72421
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/27c72421

Branch: refs/heads/master
Commit: 27c7242168003586796a0f26d905df31e8bd7735
Parents: aabcc42
Author: Robert Gemmell <ro...@apache.org>
Authored: Wed Aug 26 19:22:11 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Wed Aug 26 19:22:11 2015 +0100

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    | 39 +++++----
 .../jms/integration/SessionIntegrationTest.java | 86 +++++++++++++++++++-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 19 ++++-
 3 files changed, 124 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/27c72421/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 8b75287..ca63436 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -94,7 +94,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
         new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
     private JmsPrefetchPolicy prefetchPolicy;
     private final JmsSessionInfo sessionInfo;
-    private ExecutorService executor;
+    private volatile ExecutorService executor;
     private final ReentrantLock sendLock = new ReentrantLock();
 
     private final AtomicLong consumerIdGenerator = new AtomicLong();
@@ -860,9 +860,11 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
             consumer.stop();
         }
 
-        if (executor != null) {
-            executor.shutdown();
-            executor = null;
+        synchronized (sessionInfo) {
+            if (executor != null) {
+                executor.shutdown();
+                executor = null;
+            }
         }
     }
 
@@ -875,19 +877,26 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
     }
 
     Executor getExecutor() {
-        if (executor == null) {
-            executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
-
-                @Override
-                public Thread newThread(Runnable runner) {
-                    Thread executor = new Thread(runner);
-                    executor.setName("JmsSession ["+ sessionInfo.getSessionId() + "] dispatcher");
-                    executor.setDaemon(true);
-                    return executor;
+        ExecutorService exec = executor;
+        if(exec == null) {
+            synchronized (sessionInfo) {
+                if (executor == null) {
+                    executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+                        @Override
+                        public Thread newThread(Runnable runner) {
+                            Thread executor = new Thread(runner);
+                            executor.setName("JmsSession ["+ sessionInfo.getSessionId() + "] dispatcher");
+                            executor.setDaemon(true);
+                            return executor;
+                        }
+                    });
                 }
-            });
+
+                exec = executor;
+            }
         }
-        return executor;
+
+        return exec;
     }
 
     protected JmsSessionInfo getSessionInfo() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/27c72421/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 25e05ba..9881bf2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -32,7 +32,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -42,6 +44,7 @@ import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
@@ -83,8 +86,12 @@ import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SessionIntegrationTest extends QpidJmsTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(SessionIntegrationTest.class);
+
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
     @Test(timeout = 20000)
@@ -1193,7 +1200,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             // the Flow was in flight to the peer), and then DONT send a flow frame back to the client
             // as it can tell from the messages that all the credit has been used.
             testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
-                                                       messageCount, true, false, equalTo(UnsignedInteger.valueOf(messageCount)), 1);
+                                                       messageCount, true, false, equalTo(UnsignedInteger.valueOf(messageCount)), 1, false);
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
             // and reply with accepted and settled disposition to indicate the rollback succeeded
@@ -1458,4 +1465,81 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             connection.close();
         }
     }
+
+    @Test(timeout = 20000)
+    public void testAsyncDeliveryOrder() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            // Create a consumer, don't expect any flow as the connection is stopped
+            testPeer.expectReceiverAttach();
+
+            int messageCount = 10;
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"),
+                    messageCount, false, false, equalTo(UnsignedInteger.valueOf(messageCount)), 1, true);
+
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+            for (int i = 1; i <= messageCount; i++) {
+                // Then expect an *settled* TransactionalState disposition for each message once received by the consumer
+                TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+                stateMatcher.withTxnId(equalTo(txnId));
+                stateMatcher.withOutcome(new AcceptedMatcher());
+
+                //TODO: could also match on delivery ID's
+                testPeer.expectDisposition(true, stateMatcher);
+            }
+
+            final CountDownLatch done = new CountDownLatch(messageCount);
+            final AtomicInteger index = new AtomicInteger(-1);
+
+            consumer.setMessageListener(new DeliveryOrderListener(done, index));
+
+            testPeer.waitForAllHandlersToComplete(3000);
+            assertTrue("Not all messages received in given time", done.await(10, TimeUnit.SECONDS));
+            assertEquals("Messages were not in expected order, final index was wrong", messageCount - 1, index.get());
+        }
+    }
+
+    private static class DeliveryOrderListener implements MessageListener {
+        private final CountDownLatch done;
+        private final AtomicInteger index;
+
+        private DeliveryOrderListener(CountDownLatch done, AtomicInteger index) {
+            this.done = done;
+            this.index = index;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                int messageNumber = message.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER);
+
+                LOG.info("Listener received message: {}", messageNumber);
+
+                index.compareAndSet(messageNumber - 1, messageNumber);
+
+                done.countDown();
+            } catch (Exception e) {
+                LOG.error("Caught exception in listener", e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/27c72421/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index f9ca641..01b7e3b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -97,6 +97,8 @@ public class TestAmqpPeer implements AutoCloseable
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(TestAmqpPeer.class.getName());
 
+    public static final String MESSAGE_NUMBER = "MessageNumber";
+
     private static final Symbol ANONYMOUS = Symbol.valueOf("ANONYMOUS");
     private static final Symbol EXTERNAL = Symbol.valueOf("EXTERNAL");
     private static final Symbol PLAIN = Symbol.valueOf("PLAIN");
@@ -1102,7 +1104,7 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectLinkFlow(boolean drain, boolean sendDrainFlowResponse, Matcher<UnsignedInteger> creditMatcher)
     {
-        expectLinkFlowRespondWithTransfer(null, null, null, null, null, 0, drain, sendDrainFlowResponse, creditMatcher, null);
+        expectLinkFlowRespondWithTransfer(null, null, null, null, null, 0, drain, sendDrainFlowResponse, creditMatcher, null, false);
     }
 
     public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
@@ -1124,19 +1126,20 @@ public class TestAmqpPeer implements AutoCloseable
     {
         expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
                                           appPropertiesDescribedType, content, count, false, false,
-                                          Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)), 1);
+                                          Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)), 1, false);
     }
 
     public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
             final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
             final PropertiesDescribedType propertiesDescribedType,
-            final ApplicationPropertiesDescribedType appPropertiesDescribedType,
+            ApplicationPropertiesDescribedType appPropertiesDescribedType,
             final DescribedType content,
             final int count,
             final boolean drain,
             final boolean sendDrainFlowResponse,
             Matcher<UnsignedInteger> creditMatcher,
-            final Integer nextIncomingId)
+            final Integer nextIncomingId,
+            boolean addMessageNumberProperty)
     {
         if (nextIncomingId == null && count > 0)
         {
@@ -1171,6 +1174,10 @@ public class TestAmqpPeer implements AutoCloseable
         CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
         boolean addComposite = false;
 
+        if (appPropertiesDescribedType == null && addMessageNumberProperty) {
+            appPropertiesDescribedType = new ApplicationPropertiesDescribedType();
+        }
+
         for(int i = 0; i < count; i++)
         {
             final int nextId = nextIncomingId + i;
@@ -1178,6 +1185,10 @@ public class TestAmqpPeer implements AutoCloseable
             String tagString = "theDeliveryTag" + nextId;
             Binary dtag = new Binary(tagString.getBytes());
 
+            if(addMessageNumberProperty) {
+                appPropertiesDescribedType.setApplicationProperty(MESSAGE_NUMBER, i);
+            }
+
             final TransferFrame transferResponse = new TransferFrame()
             .setDeliveryId(UnsignedInteger.valueOf(nextId))
             .setDeliveryTag(dtag)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org