You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/12/02 22:21:05 UTC
qpid-jms git commit: QPIDJMS-207 Some cleanups around async send
completions
Repository: qpid-jms
Updated Branches:
refs/heads/master 338ec6617 -> 4c5b08d15
QPIDJMS-207 Some cleanups around async send completions
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/4c5b08d1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/4c5b08d1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/4c5b08d1
Branch: refs/heads/master
Commit: 4c5b08d157ccf9599814f4522f124164bf509476
Parents: 338ec66
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Dec 2 17:20:33 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Dec 2 17:20:33 2016 -0500
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsSession.java | 14 ++++---
.../jms/provider/amqp/AmqpFixedProducer.java | 2 +-
.../provider/amqp/AmqpTransactionContext.java | 11 +++++-
.../integration/ProducerIntegrationTest.java | 39 ++++++++++++++++++++
4 files changed, 57 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4c5b08d1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index bed9dc6..0202d42 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -291,17 +292,21 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
synchronized (sessionInfo) {
if (completionExcecutor != null) {
completionExcecutor.shutdown();
+ try {
+ completionExcecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.trace("Session close awaiting send completions was interrupted");
+ }
completionExcecutor = null;
}
}
}
}
+ //----- Events fired when resource remotely closed due to some error -----//
+
void sessionClosed(Exception cause) {
try {
- // TODO - This assumes we can't rely on the AmqpProvider to signal all pending
- // asynchronous send completions that they are failed when the session
- // is remotely closed.
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
shutdown(cause);
} catch (Throwable error) {
@@ -332,9 +337,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
try {
if (producer != null) {
- // TODO - This assumes we can't rely on the AmqpProvider to signal all pending
- // asynchronous send completions that they are failed when the producer
- // is remotely closed.
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(
producer.getProducerId(), JmsExceptionSupport.create(cause)));
producer.shutdown(cause);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4c5b08d1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 53a814b..324fdb9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -402,11 +402,11 @@ public class AmqpFixedProducer extends AmqpProducer {
// Put the message back to usable state following send complete
envelope.getMessage().onSendComplete();
- // TODO - Should this take blocked sends into consideration.
// Signal the watcher that all pending sends have completed if one is registered
// and both the in-flight sends and blocked sends have completed.
if (sendCompletionWatcher != null && sent.isEmpty() && blocked.isEmpty()) {
sendCompletionWatcher.onSuccess();
+ sendCompletionWatcher = null;
}
// Once the pending sends queue is drained and all in-flight sends have been
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4c5b08d1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 2d08bc7..4ea7465 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -130,8 +130,15 @@ public class AmqpTransactionContext implements AmqpResourceParent {
DischargeCompletion dischargeResult = new DischargeCompletion(request, true);
- LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
- coordinator.discharge(current, dischargeResult, true);
+ if (txProducers.isEmpty()) {
+ LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
+ coordinator.discharge(current, dischargeResult, true);
+ } else {
+ SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), true);
+ for (AmqpProducer producer : txProducers) {
+ producer.addSendCompletionWatcher(producersSendCompletion);
+ }
+ }
}
public void rollback(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4c5b08d1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 4df522e..a92eb67 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -2031,6 +2031,45 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testAsyncCompletionGetsNotifiedWhenProducerClosed() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+ // Use a send timeout to trigger the completion event
+ connection.setSendTimeout(500);
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+ testPeer.expectDetach(true, true, true);
+ testPeer.expectClose();
+
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+ producer.send(message, listener);
+ producer.close();
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
+ assertNotNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testAsyncCompletionResetsBytesMessage() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org