You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/12/02 22:21:05 UTC

qpid-jms git commit: QPIDJMS-207 Some cleanups around async send completions

Repository: qpid-jms
Updated Branches:
  refs/heads/master 338ec6617 -> 4c5b08d15


QPIDJMS-207 Some cleanups around async send completions

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/4c5b08d1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/4c5b08d1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/4c5b08d1

Branch: refs/heads/master
Commit: 4c5b08d157ccf9599814f4522f124164bf509476
Parents: 338ec66
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Dec 2 17:20:33 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Dec 2 17:20:33 2016 -0500

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    | 14 ++++---
 .../jms/provider/amqp/AmqpFixedProducer.java    |  2 +-
 .../provider/amqp/AmqpTransactionContext.java   | 11 +++++-
 .../integration/ProducerIntegrationTest.java    | 39 ++++++++++++++++++++
 4 files changed, 57 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4c5b08d1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index bed9dc6..0202d42 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -291,17 +292,21 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             synchronized (sessionInfo) {
                 if (completionExcecutor != null) {
                     completionExcecutor.shutdown();
+                    try {
+                        completionExcecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                        LOG.trace("Session close awaiting send completions was interrupted");
+                    }
                     completionExcecutor = null;
                 }
             }
         }
     }
 
+    //----- Events fired when resource remotely closed due to some error -----//
+
     void sessionClosed(Exception cause) {
         try {
-            // TODO - This assumes we can't rely on the AmqpProvider to signal all pending
-            //        asynchronous send completions that they are failed when the session
-            //        is remotely closed.
             getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
             shutdown(cause);
         } catch (Throwable error) {
@@ -332,9 +337,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
         try {
             if (producer != null) {
-                // TODO - This assumes we can't rely on the AmqpProvider to signal all pending
-                //        asynchronous send completions that they are failed when the producer
-                //        is remotely closed.
                 getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(
                     producer.getProducerId(), JmsExceptionSupport.create(cause)));
                 producer.shutdown(cause);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4c5b08d1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 53a814b..324fdb9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -402,11 +402,11 @@ public class AmqpFixedProducer extends AmqpProducer {
             // Put the message back to usable state following send complete
             envelope.getMessage().onSendComplete();
 
-            // TODO - Should this take blocked sends into consideration.
             // Signal the watcher that all pending sends have completed if one is registered
             // and both the in-flight sends and blocked sends have completed.
             if (sendCompletionWatcher != null && sent.isEmpty() && blocked.isEmpty()) {
                 sendCompletionWatcher.onSuccess();
+                sendCompletionWatcher = null;
             }
 
             // Once the pending sends queue is drained and all in-flight sends have been

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4c5b08d1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 2d08bc7..4ea7465 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -130,8 +130,15 @@ public class AmqpTransactionContext implements AmqpResourceParent {
 
         DischargeCompletion dischargeResult = new DischargeCompletion(request, true);
 
-        LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
-        coordinator.discharge(current, dischargeResult, true);
+        if (txProducers.isEmpty()) {
+            LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
+            coordinator.discharge(current, dischargeResult, true);
+        } else {
+            SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), true);
+            for (AmqpProducer producer : txProducers) {
+                producer.addSendCompletionWatcher(producersSendCompletion);
+            }
+        }
     }
 
     public void rollback(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4c5b08d1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 4df522e..a92eb67 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -2031,6 +2031,45 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testAsyncCompletionGetsNotifiedWhenProducerClosed() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+            // Use a send timeout to trigger the completion event
+            connection.setSendTimeout(500);
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+            testPeer.expectDetach(true, true, true);
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            producer.send(message, listener);
+            producer.close();
+
+            assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNotNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testAsyncCompletionResetsBytesMessage() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org