You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/09/12 20:12:04 UTC
[3/7] qpid-jms git commit: QPIDJMS-207 Adds support for Asynchronous
JMS 2.0 sends.
QPIDJMS-207 Adds support for Asynchronous JMS 2.0 sends.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3a03663b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3a03663b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3a03663b
Branch: refs/heads/master
Commit: 3a03663b79f98f80cd75f297cd9b70241ac68da3
Parents: 6553cfd
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Sep 12 13:09:56 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Sep 12 13:09:56 2016 -0400
----------------------------------------------------------------------
.../apache/qpid/jms/JmsCompletionListener.java | 47 ++
.../java/org/apache/qpid/jms/JmsConnection.java | 36 +-
.../org/apache/qpid/jms/JmsMessageConsumer.java | 6 +-
.../org/apache/qpid/jms/JmsMessageProducer.java | 100 +++-
.../java/org/apache/qpid/jms/JmsSession.java | 349 +++++++++--
.../jms/message/JmsInboundMessageDispatch.java | 21 +-
.../jms/message/JmsOutboundMessageDispatch.java | 42 +-
.../jms/provider/DefaultProviderListener.java | 9 +
.../qpid/jms/provider/ProviderListener.java | 22 +
.../qpid/jms/provider/ProviderWrapper.java | 10 +
.../jms/provider/amqp/AmqpAbstractResource.java | 5 +-
.../amqp/AmqpAnonymousFallbackProducer.java | 17 +-
.../qpid/jms/provider/amqp/AmqpConsumer.java | 55 +-
.../qpid/jms/provider/amqp/AmqpEventSink.java | 6 +-
.../jms/provider/amqp/AmqpExceptionBuilder.java | 34 ++
.../jms/provider/amqp/AmqpFixedProducer.java | 257 +++++---
.../qpid/jms/provider/amqp/AmqpProducer.java | 15 +-
.../qpid/jms/provider/amqp/AmqpProvider.java | 39 +-
.../provider/amqp/AmqpTransactionContext.java | 155 +++--
.../amqp/AmqpTransactionCoordinator.java | 4 +-
.../amqp/builders/AmqpResourceBuilder.java | 13 +-
.../jms/provider/failover/FailoverProvider.java | 18 +
.../integration/ConsumerIntegrationTest.java | 109 ++++
.../PresettledProducerIntegrationTest.java | 231 ++++++++
.../integration/ProducerIntegrationTest.java | 592 +++++++++++++++++++
.../jms/integration/SessionIntegrationTest.java | 114 +++-
.../jms/producer/JmsMessageProducerTest.java | 421 ++++++++++++-
.../failover/FailoverIntegrationTest.java | 194 ++++++
.../qpid/jms/provider/mock/MockProvider.java | 12 +-
.../mock/MockProviderConfiguration.java | 10 +
.../qpid/jms/provider/mock/MockRemotePeer.java | 123 ++++
31 files changed, 2793 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
new file mode 100644
index 0000000..7a6c4d6
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Message;
+
+/**
+ * Interface used to implement listeners for asynchronous {@link javax.jms.Message}
+ * sends which will be notified on successful completion of a send or be notified of an
+ * error that was encountered while attempting to send a {@link javax.jms.Message}.
+ */
+public interface JmsCompletionListener {
+
+ /**
+ * Called when an asynchronous send operation completes successfully.
+ *
+ * @param message
+ * the {@link javax.jms.Message} that was successfully sent.
+ */
+ void onCompletion(Message message);
+
+ /**
+ * Called when an asynchronous send operation fails to complete, the state
+ * of the send is unknown at this point.
+ *
+ * @param message
+ * the {@link javax.jms.Message} that was to be sent.
+ * @param exception
+ * the {@link java.lang.Exception} that describes the send error.
+ */
+ void onException(Message message, Exception exception);
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 827da11..a04d1b3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -156,6 +156,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
public void close() throws JMSException {
boolean interrupted = Thread.interrupted();
+ for (JmsSession session : sessions.values()) {
+ session.checkIsDeliveryThread();
+ session.checkIsCompletionThread();
+ }
+
try {
if (!closed.get() && !failed.get()) {
@@ -1072,6 +1077,26 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
@Override
+ public void onCompletedMessageSend(JmsOutboundMessageDispatch envelope) {
+ JmsSession session = sessions.get(envelope.getProducerId().getParentId());
+ if (session != null) {
+ session.onCompletedMessageSend(envelope);
+ } else {
+ LOG.debug("No matching Session found for async send result");
+ }
+ }
+
+ @Override
+ public void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause) {
+ JmsSession session = sessions.get(envelope.getProducerId().getParentId());
+ if (session != null) {
+ session.onFailedMessageSend(envelope, cause);
+ } else {
+ LOG.debug("No matching Session found for failed async send result");
+ }
+ }
+
+ @Override
public void onConnectionInterrupted(final URI remoteURI) {
for (JmsSession session : sessions.values()) {
session.onConnectionInterrupted();
@@ -1161,6 +1186,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
public void onConnectionFailure(final IOException ex) {
providerFailed(ex);
+ // Signal that connection dropped we need to mark transactions as
+ // failed, deliver failure events to asynchronous send completions etc.
+ for (JmsSession session : sessions.values()) {
+ session.onConnectionInterrupted();
+ }
+
onProviderException(ex);
for (AsyncResult request : requests.keySet()) {
@@ -1304,10 +1335,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
if (!closed.get() && !closing.get()) {
if (this.exceptionListener != null) {
- if (!(error instanceof JMSException)) {
- error = JmsExceptionSupport.create(error);
- }
- final JMSException jmsError = (JMSException)error;
+ final JMSException jmsError = JmsExceptionSupport.create(error);
executor.execute(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index de7ef63..18fd764 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -438,10 +438,10 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
}
if (this.messageListener != null && this.started) {
- session.getExecutor().execute(new MessageDeliverTask());
+ session.getDispatcherExecutor().execute(new MessageDeliverTask());
} else {
if (availableListener != null) {
- session.getExecutor().execute(new Runnable() {
+ session.getDispatcherExecutor().execute(new Runnable() {
@Override
public void run() {
if (session.isStarted()) {
@@ -507,7 +507,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
void drainMessageQueueToListener() {
if (this.messageListener != null && this.started) {
- session.getExecutor().execute(new MessageDeliverTask());
+ session.getDispatcherExecutor().execute(new MessageDeliverTask());
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index a1bbe38..65812b7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -158,7 +158,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination");
}
- sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive);
+ sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive, null);
}
@Override
@@ -174,15 +174,107 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination.");
}
- sendMessage(destination, message, deliveryMode, priority, timeToLive);
+ sendMessage(destination, message, deliveryMode, priority, timeToLive, null);
}
- private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ /**
+ * Sends the message asynchronously and notifies the assigned listener on success or failure
+ *
+ * @param message
+ * the {@link javax.jms.Message} to send.
+ * @param listener
+ * the {@link JmsCompletionListener} to notify on send success or failure.
+ *
+ * @throws JMSException if an error occurs while attempting to send the Message.
+ */
+ public void send(Message message, JmsCompletionListener listener) throws JMSException {
+ send(message, this.deliveryMode, this.priority, this.timeToLive, listener);
+ }
+
+ /**
+ * Sends the message asynchronously and notifies the assigned listener on success or failure
+ *
+ * @param message
+ * the {@link javax.jms.Message} to send.
+ * @param deliveryMode
+ * the delivery mode to assign to the outbound Message.
+ * @param priority
+ * the priority to assign to the outbound Message.
+ * @param timeToLive
+ * the time to live value to assign to the outbound Message.
+ * @param listener
+ * the {@link JmsCompletionListener} to notify on send success or failure.
+ *
+ * @throws JMSException if an error occurs while attempting to send the Message.
+ */
+ public void send(Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
+ checkClosed();
+
+ if (anonymousProducer) {
+ throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination");
+ }
+
+ if (listener == null) {
+ throw new IllegalArgumentException("JmsCompletetionListener cannot be null");
+ }
+
+ sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive, listener);
+ }
+
+ /**
+ * Sends the message asynchronously and notifies the assigned listener on success or failure
+ *
+ * @param destination
+ * the Destination to send the given Message to.
+ * @param message
+ * the {@link javax.jms.Message} to send.
+ * @param listener
+ * the {@link JmsCompletionListener} to notify on send success or failure.
+ *
+ * @throws JMSException if an error occurs while attempting to send the Message.
+ */
+ public void send(Destination destination, Message message, JmsCompletionListener listener) throws JMSException {
+ send(destination, message, this.deliveryMode, this.priority, this.timeToLive, listener);
+ }
+
+ /**
+ * Sends the message asynchronously and notifies the assigned listener on success or failure
+ *
+ * @param destination
+ * the Destination to send the given Message to.
+ * @param message
+ * the {@link javax.jms.Message} to send.
+ * @param deliveryMode
+ * the delivery mode to assign to the outbound Message.
+ * @param priority
+ * the priority to assign to the outbound Message.
+ * @param timeToLive
+ * the time to live value to assign to the outbound Message.
+ * @param listener
+ * the {@link JmsCompletionListener} to notify on send success or failure.
+ *
+ * @throws JMSException if an error occurs while attempting to send the Message.
+ */
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
+ checkClosed();
+
+ if (!anonymousProducer) {
+ throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination.");
+ }
+
+ if (listener == null) {
+ throw new IllegalArgumentException("JmsCompletetionListener cannot be null");
+ }
+
+ sendMessage(destination, message, deliveryMode, priority, timeToLive, listener);
+ }
+
+ private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
if (destination == null) {
throw new InvalidDestinationException("Don't understand null destinations");
}
- this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp);
+ this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, listener);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 4644267..817f342 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -18,8 +18,11 @@ package org.apache.qpid.jms;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -58,6 +61,7 @@ import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.JmsMessageTransformation;
@@ -98,14 +102,18 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages =
new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
private final JmsSessionInfo sessionInfo;
- private volatile ExecutorService executor;
private final ReentrantLock sendLock = new ReentrantLock();
+ private volatile ExecutorService deliveryExecutor;
+ private volatile ExecutorService completionExcecutor;
+ private Thread deliveryThread;
+ private Thread completionThread;
private final AtomicLong consumerIdGenerator = new AtomicLong();
private final AtomicLong producerIdGenerator = new AtomicLong();
private JmsTransactionContext transactionContext;
private boolean sessionRecovered;
private final AtomicReference<Exception> failureCause = new AtomicReference<Exception>();
+ private final Deque<SendCompletion> asyncSendQueue = new ConcurrentLinkedDeque<SendCompletion>();
protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
this.connection = connection;
@@ -178,6 +186,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
@Override
public void commit() throws JMSException {
checkClosed();
+ checkIsCompletionThread();
if (!getTransacted()) {
throw new javax.jms.IllegalStateException("Not a transacted session");
@@ -189,6 +198,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
@Override
public void rollback() throws JMSException {
checkClosed();
+ checkIsCompletionThread();
if (!getTransacted()) {
throw new javax.jms.IllegalStateException("Not a transacted session");
@@ -223,6 +233,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
@Override
public void close() throws JMSException {
+ checkIsDeliveryThread();
+ checkIsCompletionThread();
+
if (!closed.get()) {
doClose();
}
@@ -272,11 +285,22 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
transactionContext.shutdown();
+
+ synchronized (sessionInfo) {
+ if (completionExcecutor != null) {
+ completionExcecutor.shutdown();
+ completionExcecutor = null;
+ }
+ }
}
}
void sessionClosed(Exception cause) {
try {
+ // TODO - This assumes we can't rely on the AmqpProvider to signal all pending
+ // asynchronous send completions that they are failed when the session
+ // is remotely closed.
+ getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
shutdown(cause);
} catch (Throwable error) {
LOG.trace("Ignoring exception thrown during cleanup of closed session", error);
@@ -306,6 +330,11 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
try {
if (producer != null) {
+ // TODO - This assumes we can't rely on the AmqpProvider to signal all pending
+ // asynchronous send completions that they are failed when the producer
+ // is remotely closed.
+ getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(
+ producer.getProducerId(), JmsExceptionSupport.create(cause)));
producer.shutdown(cause);
}
} catch (Throwable error) {
@@ -624,17 +653,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
connection.onException(ex);
}
- protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException {
+ protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, JmsCompletionListener listener) throws JMSException {
JmsDestination destination = JmsMessageTransformation.transformDestination(connection, dest);
if (destination.isTemporary() && ((JmsTemporaryDestination) destination).isDeleted()) {
throw new IllegalStateException("Temporary destination has been deleted");
}
- send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp);
+ send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp, listener);
}
- private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException {
+ private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, JmsCompletionListener listener) throws JMSException {
sendLock.lock();
try {
original.setJMSDeliveryMode(deliveryMode);
@@ -707,14 +736,35 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
envelope.setMessage(copy);
envelope.setProducerId(producer.getProducerId());
envelope.setDestination(destination);
- envelope.setSendAsync(!sync);
+ envelope.setSendAsync(listener == null ? !sync : true);
envelope.setDispatchId(messageSequence);
+ envelope.setCompletionRequired(listener != null);
if (producer.isAnonymous()) {
envelope.setPresettle(getPresettlePolicy().isProducerPresttled(this, destination));
}
- transactionContext.send(connection, envelope);
+ SendCompletion completion = null;
+ if (envelope.isCompletionRequired()) {
+ completion = new SendCompletion(envelope, listener);
+ asyncSendQueue.addLast(completion);
+ }
+
+ try {
+ transactionContext.send(connection, envelope);
+ } catch (JMSException jmsEx) {
+ // If the synchronous portion of the send fails the completion be
+ // notified but might depending on the circumstances of the failures,
+ // remove it from the queue and check if is is already completed.
+ if (completion != null) {
+ asyncSendQueue.remove(completion);
+ if (completion.hasCompleted()) {
+ return;
+ }
+ }
+
+ throw jmsEx;
+ }
} finally {
sendLock.unlock();
}
@@ -837,9 +887,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
synchronized (sessionInfo) {
- if (executor != null) {
- executor.shutdown();
- executor = null;
+ if (deliveryExecutor != null) {
+ deliveryExecutor.shutdown();
+ deliveryExecutor = null;
}
}
}
@@ -852,29 +902,62 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
return connection;
}
- Executor getExecutor() {
- ExecutorService exec = executor;
- if(exec == null) {
+ Executor getDispatcherExecutor() {
+ ExecutorService exec = deliveryExecutor;
+ if (exec == null) {
synchronized (sessionInfo) {
- if (executor == null) {
- executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable runner) {
- Thread executor = new Thread(runner);
- executor.setName("JmsSession ["+ sessionInfo.getId() + "] dispatcher");
- executor.setDaemon(true);
- return executor;
- }
- });
+ if (deliveryExecutor == null) {
+ deliveryExecutor = createExecutor("delivery dispatcher");
}
- exec = executor;
+ exec = deliveryExecutor;
+ exec.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ JmsSession.this.deliveryThread = Thread.currentThread();
+ }
+ });
}
}
return exec;
}
+ Executor getCompletionExecutor() {
+ ExecutorService exec = completionExcecutor;
+ if (exec == null) {
+ synchronized (sessionInfo) {
+ if (completionExcecutor == null) {
+ completionExcecutor = createExecutor("completion dispatcher");
+ }
+
+ exec = completionExcecutor;
+ exec.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ JmsSession.this.completionThread = Thread.currentThread();
+ }
+ });
+ }
+ }
+
+ return exec;
+ }
+
+ private ExecutorService createExecutor(final String threadNameSuffix) {
+ return Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable runner) {
+ Thread executor = new Thread(runner);
+ executor.setName("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix);
+ executor.setDaemon(true);
+ return executor;
+ }
+ });
+ }
+
protected JmsSessionInfo getSessionInfo() {
return sessionInfo;
}
@@ -925,6 +1008,18 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
}
+ void checkIsDeliveryThread() throws JMSException {
+ if (Thread.currentThread().equals(deliveryThread)) {
+ throw new IllegalStateException("Illegal invocation from MessageListener callback");
+ }
+ }
+
+ void checkIsCompletionThread() throws JMSException {
+ if (Thread.currentThread().equals(completionThread)) {
+ throw new IllegalStateException("Illegal invocation from CompletionListener callback");
+ }
+ }
+
public JmsMessageIDPolicy getMessageIDPolicy() {
return sessionInfo.getMessageIDPolicy();
}
@@ -945,6 +1040,36 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
return sessionInfo.getDeserializationPolicy();
}
+ /**
+ * Sets the transaction context of the session.
+ *
+ * @param transactionContext
+ * provides the means to control a JMS transaction.
+ */
+ public void setTransactionContext(JmsTransactionContext transactionContext) {
+ this.transactionContext = transactionContext;
+ }
+
+ /**
+ * Returns the transaction context of the session.
+ *
+ * @return transactionContext
+ * session's transaction context.
+ */
+ public JmsTransactionContext getTransactionContext() {
+ return transactionContext;
+ }
+
+ boolean isSessionRecovered() {
+ return sessionRecovered;
+ }
+
+ void clearSessionRecovered() {
+ sessionRecovered = false;
+ }
+
+ //----- Event handlers ---------------------------------------------------//
+
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
if (started.get()) {
@@ -954,10 +1079,22 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
}
+ protected void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) {
+ getCompletionExecutor().execute(new AsyncCompletionTask(envelope));
+ }
+
+ protected void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, final Throwable cause) {
+ getCompletionExecutor().execute(new AsyncCompletionTask(envelope, cause));
+ }
+
protected void onConnectionInterrupted() {
transactionContext.onConnectionInterrupted();
+ // TODO - Synthesize a better exception
+ JMSException failureCause = new JMSException("Send failed due to connection loss");
+ getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(failureCause));
+
for (JmsMessageProducer producer : producers.values()) {
producer.onConnectionInterrupted();
}
@@ -1019,31 +1156,155 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
}
- /**
- * Sets the transaction context of the session.
- *
- * @param transactionContext
- * provides the means to control a JMS transaction.
- */
- public void setTransactionContext(JmsTransactionContext transactionContext) {
- this.transactionContext = transactionContext;
- }
+ //----- Asynchronous Send Helpers ----------------------------------------//
- /**
- * Returns the transaction context of the session.
- *
- * @return transactionContext
- * session's transaction context.
- */
- public JmsTransactionContext getTransactionContext() {
- return transactionContext;
+ private final class FailOrCompleteAsyncCompletionsTask implements Runnable {
+
+ private final JMSException failureCause;
+ private final JmsProducerId producerId;
+
+ public FailOrCompleteAsyncCompletionsTask(JMSException failureCause) {
+ this(null, failureCause);
+ }
+
+ public FailOrCompleteAsyncCompletionsTask(JmsProducerId producerId, JMSException failureCause) {
+ this.failureCause = failureCause;
+ this.producerId = producerId;
+ }
+
+ @Override
+ public void run() {
+ // For any completion that is not yet marked as complete we fail it
+ // otherwise we send the already marked completion state event.
+ Iterator<SendCompletion> pending = asyncSendQueue.iterator();
+ while (pending.hasNext()) {
+ SendCompletion completion = pending.next();
+
+ if (!completion.hasCompleted()) {
+ if (producerId == null || producerId.equals(completion.envelope.getProducerId())) {
+ completion.markAsFailed(failureCause);
+ }
+ }
+
+ try {
+ completion.signalCompletion();
+ } catch (Throwable error) {
+ LOG.trace("Signaled completion of send: {}", completion.envelope);
+ }
+ }
+
+ asyncSendQueue.clear();
+ }
}
- boolean isSessionRecovered() {
- return sessionRecovered;
+ private final class AsyncCompletionTask implements Runnable {
+
+ private final JmsOutboundMessageDispatch envelope;
+ private final Throwable cause;
+
+ public AsyncCompletionTask(JmsOutboundMessageDispatch envelope) {
+ this(envelope, null);
+ }
+
+ public AsyncCompletionTask(JmsOutboundMessageDispatch envelope, Throwable cause) {
+ this.envelope = envelope;
+ this.cause = cause;
+ }
+
+ @Override
+ public void run() {
+ try {
+ SendCompletion completion = asyncSendQueue.peek();
+ if (completion.getEnvelope().getDispatchId() == envelope.getDispatchId()) {
+ try {
+ completion = asyncSendQueue.remove();
+ if (cause == null) {
+ completion.markAsComplete();
+ } else {
+ completion.markAsFailed(JmsExceptionSupport.create(cause));
+ }
+ completion.signalCompletion();
+ } catch (Throwable error) {
+ LOG.trace("Failed while performing send completion: {}", envelope);
+ // TODO - What now?
+ }
+
+ // Signal any trailing completions that have been marked complete
+ // before this one was that they have now that the one in front has
+ Iterator<SendCompletion> pending = asyncSendQueue.iterator();
+ while (pending.hasNext()) {
+ completion = pending.next();
+ if (completion.hasCompleted()) {
+ try {
+ completion.signalCompletion();
+ } catch (Throwable error) {
+ LOG.trace("Failed while performing send completion: {}", envelope);
+ // TODO - What now?
+ } finally {
+ pending.remove();
+ }
+ } else {
+ break;
+ }
+ }
+ } else {
+ // Not head so mark as complete and wait for the one in front to send
+ // the notification of completion.
+ Iterator<SendCompletion> pending = asyncSendQueue.iterator();
+ while (pending.hasNext()) {
+ completion = pending.next();
+ if (completion.getEnvelope().getDispatchId() == envelope.getDispatchId()) {
+ if (cause == null) {
+ completion.markAsComplete();
+ } else {
+ completion.markAsFailed(JmsExceptionSupport.create(cause));
+ }
+ }
+ }
+ }
+ } catch (Exception ex) {
+ LOG.debug("Send completion task encounted unexpected error: {}", ex.getMessage());
+ // TODO - What now
+ }
+ }
}
- void clearSessionRecovered() {
- sessionRecovered = false;
+ private final class SendCompletion {
+
+ private final JmsOutboundMessageDispatch envelope;
+ private final JmsCompletionListener listener;
+
+ private Exception failureCause;
+ private boolean completed;
+
+ public SendCompletion(JmsOutboundMessageDispatch envelope, JmsCompletionListener listener) {
+ this.envelope = envelope;
+ this.listener = listener;
+ }
+
+ public void markAsComplete() {
+ completed = true;
+ }
+
+ public void markAsFailed(Exception cause) {
+ completed = true;
+ failureCause = cause;
+ }
+
+ public boolean hasCompleted() {
+ return completed;
+ }
+
+ public void signalCompletion() {
+ if (failureCause == null) {
+ listener.onCompletion(envelope.getMessage());
+ } else {
+ listener.onException(envelope.getMessage(), failureCause);
+ }
+ }
+
+ public JmsOutboundMessageDispatch getEnvelope() {
+ return envelope;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index 577ac17..c038519 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -30,6 +30,8 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId {
private JmsMessage message;
private boolean enqueueFirst;
+ private transient String stringView;
+
public JmsInboundMessageDispatch(long sequence) {
this.sequence = sequence;
}
@@ -74,10 +76,21 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId {
@Override
public String toString() {
- return "JmsInboundMessageDispatch {sequence = " + sequence
- + ", messageId = " + messageId
- + ", consumerId = " + consumerId
- + "}";
+ if (stringView == null) {
+ StringBuilder builder = new StringBuilder();
+
+ builder.append("JmsInboundMessageDispatch { sequence = ");
+ builder.append(sequence);
+ builder.append(", messageId = ");
+ builder.append(messageId);
+ builder.append(", consumerId = ");
+ builder.append(consumerId);
+ builder.append(" }");
+
+ stringView = builder.toString();
+ }
+
+ return stringView;
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
index 05b9089..a34768f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
@@ -29,8 +29,11 @@ public class JmsOutboundMessageDispatch {
private JmsDestination destination;
private boolean sendAsync;
private boolean presettle;
+ private boolean completionRequired;
private long dispatchId;
+ private transient String stringView;
+
public JmsDestination getDestination() {
return destination;
}
@@ -39,6 +42,10 @@ public class JmsOutboundMessageDispatch {
this.destination = destination;
}
+ public Object getMessageId() {
+ return message.getFacade().getProviderMessageIdObject();
+ }
+
public JmsMessage getMessage() {
return message;
}
@@ -79,15 +86,34 @@ public class JmsOutboundMessageDispatch {
this.presettle = presettle;
}
- @Override
- public String toString() {
- StringBuilder value = new StringBuilder();
+ public boolean isCompletionRequired() {
+ return completionRequired;
+ }
- value.append("JmsOutboundMessageDispatch {dispatchId = ");
- value.append(getProducerId());
- value.append("-");
- value.append(getDispatchId());
+ public void setCompletionRequired(boolean completionRequired) {
+ this.completionRequired = completionRequired;
+ }
- return value.toString();
+ @Override
+ public String toString() {
+ if (stringView == null) {
+ StringBuilder value = new StringBuilder();
+
+ value.append("JmsOutboundMessageDispatch {dispatchId = ");
+ value.append(getProducerId());
+ value.append("-");
+ value.append(getDispatchId());
+ value.append(", MessageID = ");
+ try {
+ value.append(message.getJMSMessageID());
+ } catch (Throwable e) {
+ value.append("<unknown>");
+ }
+ value.append(" }");
+
+ stringView = value.toString();
+ }
+
+ return stringView;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
index 22e204e..d2eb95c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsResource;
/**
@@ -32,6 +33,14 @@ public class DefaultProviderListener implements ProviderListener {
}
@Override
+ public void onCompletedMessageSend(JmsOutboundMessageDispatch envelope) {
+ }
+
+ @Override
+ public void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause) {
+ }
+
+ @Override
public void onConnectionInterrupted(URI remoteURI) {
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
index 5c758ed..11a5f6b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsResource;
/**
@@ -36,6 +37,27 @@ public interface ProviderListener {
void onInboundMessage(JmsInboundMessageDispatch envelope);
/**
+ * Called when an outbound message dispatch that requested a completion callback
+ * has reached a state where the send can be considered successful based on the QoS
+ * level associated of the outbound message.
+ *
+ * @param envelope
+ * the original outbound message dispatch that is now complete.
+ */
+ void onCompletedMessageSend(JmsOutboundMessageDispatch envelope);
+
+ /**
+ * Called when an outbound message dispatch that requested a completion callback
+ * has reached a state where the send can be considered failed.
+ *
+ * @param envelope
+ * the original outbound message dispatch that should be treated as a failed send.
+ * @param cause
+ * the exception that describes the cause of the failed send.
+ */
+ void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause);
+
+ /**
* Called from a fault tolerant Provider instance to signal that the underlying
* connection to the Broker has been lost. The Provider will attempt to reconnect
* following this event unless closed.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index 8574e04..3a3d383 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -154,6 +154,16 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
}
@Override
+ public void onCompletedMessageSend(JmsOutboundMessageDispatch envelope) {
+ listener.onCompletedMessageSend(envelope);
+ }
+
+ @Override
+ public void onFailedMessageSend(JmsOutboundMessageDispatch envelope, Throwable cause) {
+ listener.onFailedMessageSend(envelope, cause);
+ }
+
+ @Override
public void onConnectionInterrupted(URI remoteURI) {
listener.onConnectionInterrupted(remoteURI);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index a8599ac..634cafd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -23,6 +23,7 @@ import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.meta.JmsResource;
import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.EndpointState;
import org.slf4j.Logger;
@@ -158,7 +159,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
closeOrDetachEndpoint();
}
- // Process the close now, so that child close operations see the correct state.
+ // Process the close before moving on to closing down child resources
provider.pumpToProtonTransport();
handleResourceClosure(provider, error);
@@ -253,7 +254,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
}
@Override
- public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
+ public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
// Nothing do be done here, subclasses can override as needed.
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index b44e3b3..3e07b25 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -69,7 +69,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
}
@Override
- public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+ public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
LOG.trace("Started send chain for anonymous producer: {}", getProducerId());
// Force sends marked as asynchronous to be sent synchronous so that the temporary
@@ -91,7 +91,8 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
// We open a Fixed Producer instance with the target destination. Once it opens
// it will trigger the open event which will in turn trigger the send event.
- // If caching is disabled the created producer will be closed immediately.
+ // If caching is disabled the created producer will be closed immediately after
+ // the entire send chain has finished and the delivery has been acknowledged.
AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info);
builder.buildResource(new AnonymousSendRequest(request, builder, envelope));
@@ -100,9 +101,9 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
producerCache.put(envelope.getDestination(), builder.getResource());
}
- return true;
+ getParent().getProvider().pumpToProtonTransport(request);
} else {
- return producer.send(envelope, request);
+ producer.send(envelope, request);
}
}
@@ -135,6 +136,14 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
return new JmsProducerId(producerIdKey, -1, producerIdCount++);
}
+ @Override
+ public void addSendCompletionWatcher(AsyncResult watcher) {
+ throw new UnsupportedOperationException(
+ "The fallback producer parent should never have a watcher assigned.");
+ }
+
+ //----- AsyncResult objects used to complete the sends -------------------//
+
private abstract class AnonymousRequest extends WrappedAsyncResult {
protected final JmsOutboundMessageDispatch envelope;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index c15fd08..89586e3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -385,42 +385,35 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
@Override
- public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
- Delivery incoming = null;
- do {
- incoming = getEndpoint().current();
- if (incoming != null) {
- if (incoming.isReadable() && !incoming.isPartial()) {
- LOG.trace("{} has incoming Message(s).", this);
- try {
- if (processDelivery(incoming)) {
- // We processed a message, signal completion
- // of a message pull request if there is one.
- if (pullRequest != null) {
- pullRequest.onSuccess();
- pullRequest = null;
- }
- }
- } catch (Exception e) {
- throw IOExceptionSupport.create(e);
+ public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
+ if (delivery.isReadable() && !delivery.isPartial()) {
+ LOG.trace("{} has incoming Message(s).", this);
+ try {
+ if (processDelivery(delivery)) {
+ // We processed a message, signal completion
+ // of a message pull request if there is one.
+ if (pullRequest != null) {
+ pullRequest.onSuccess();
+ pullRequest = null;
}
- } else {
- LOG.trace("{} has a partial incoming Message(s), deferring.", this);
- incoming = null;
}
- } else {
- // We have exhausted the locally queued messages on this link.
- // Check if we tried to stop and have now run out of credit.
- if (getEndpoint().getRemoteCredit() <= 0) {
- if (stopRequest != null) {
- stopRequest.onSuccess();
- stopRequest = null;
- }
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ if (getEndpoint().current() == null) {
+ // We have exhausted the locally queued messages on this link.
+ // Check if we tried to stop and have now run out of credit.
+ if (getEndpoint().getRemoteCredit() <= 0) {
+ if (stopRequest != null) {
+ stopRequest.onSuccess();
+ stopRequest = null;
}
}
- } while (incoming != null);
+ }
- super.processDeliveryUpdates(provider);
+ super.processDeliveryUpdates(provider, delivery);
}
private boolean processDelivery(Delivery incoming) throws Exception {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
index b3e7501..2e25ad1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpEventSink.java
@@ -18,6 +18,8 @@ package org.apache.qpid.jms.provider.amqp;
import java.io.IOException;
+import org.apache.qpid.proton.engine.Delivery;
+
/**
* Interface used by classes that want to process AMQP events sent from
* the transport layer.
@@ -60,10 +62,12 @@ public interface AmqpEventSink {
*
* @param provider
* the AmqpProvider instance for easier access to fire events.
+ * @param delivery
+ * the Delivery that has an update to its state which needs handled.
*
* @throws IOException if an error occurs while processing the update.
*/
- void processDeliveryUpdates(AmqpProvider provider) throws IOException;
+ void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException;
/**
* Called when the Proton Engine signals an Flow related event has been triggered
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java
new file mode 100644
index 0000000..2ecf245
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpExceptionBuilder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+/**
+ * Used to provide a source for an exception based on some event such as
+ * operation timed out, etc.
+ */
+public interface AmqpExceptionBuilder {
+
+ /**
+ * Creates an exception appropriate to some failure condition
+ *
+ * @return a new Exception instance that describes a failure condition.
+ */
+ Exception createException();
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index c354822..9233ce1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -18,10 +18,10 @@ package org.apache.qpid.jms.provider.amqp;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import javax.jms.IllegalStateException;
@@ -62,10 +62,12 @@ public class AmqpFixedProducer extends AmqpProducer {
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
- private final Set<Delivery> sent = new LinkedHashSet<Delivery>();
- private final LinkedList<InFlightSend> blocked = new LinkedList<InFlightSend>();
+ private final Map<Object, InFlightSend> sent = new LinkedHashMap<Object, InFlightSend>();
+ private final Map<Object, InFlightSend> blocked = new LinkedHashMap<Object, InFlightSend>();
private byte[] encodeBuffer = new byte[1024 * 8];
+ private AsyncResult sendCompletionWatcher;
+
public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
super(session, info);
}
@@ -86,29 +88,24 @@ public class AmqpFixedProducer extends AmqpProducer {
}
@Override
- public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+ public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
if (isClosed()) {
request.onFailure(new IllegalStateException("The MessageProducer is closed"));
}
if (getEndpoint().getCredit() <= 0) {
LOG.trace("Holding Message send until credit is available.");
- // Once a message goes into a held mode we no longer can send it async, so
- // we clear the async flag if set to avoid the sender never getting notified.
- envelope.setSendAsync(false);
InFlightSend send = new InFlightSend(envelope, request);
if (getSendTimeout() > JmsConnectionInfo.INFINITE) {
- send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(
- send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for credit to send Message", envelope.getMessage()));
+ send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send);
}
- blocked.addLast(send);
- return false;
+ blocked.put(envelope.getMessageId(), send);
+ getParent().getProvider().pumpToProtonTransport(request);
} else {
doSend(envelope, request);
- return true;
}
}
@@ -133,35 +130,54 @@ public class AmqpFixedProducer extends AmqpProducer {
delivery = getEndpoint().delivery(tag, 0, tag.length);
}
- delivery.setContext(request);
-
if (session.isTransacted()) {
- Binary amqpTxId = session.getTransactionContext().getAmqpTransactionId();
+ AmqpTransactionContext context = session.getTransactionContext();
+ Binary amqpTxId = context.getAmqpTransactionId();
TransactionalState state = new TransactionalState();
state.setTxnId(amqpTxId);
delivery.disposition(state);
+ context.registerTxProducer(this);
}
AmqpJmsMessageFacade amqpMessageFacade = (AmqpJmsMessageFacade) facade;
encodeAndSend(amqpMessageFacade.getAmqpMessage(), delivery);
+ AmqpProvider provider = getParent().getProvider();
+
+ InFlightSend send = null;
+ if (request instanceof InFlightSend) {
+ send = (InFlightSend) request;
+ } else {
+ send = new InFlightSend(envelope, request);
+
+ if (!presettle && getSendTimeout() != JmsConnectionInfo.INFINITE) {
+ send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send);
+ }
+ }
+
if (presettle) {
delivery.settle();
} else {
- sent.add(delivery);
+ sent.put(envelope.getMessageId(), send);
getEndpoint().advance();
}
- if (envelope.isSendAsync() || presettle) {
- request.onSuccess();
- } else if (getSendTimeout() != JmsConnectionInfo.INFINITE) {
- InFlightSend send = new InFlightSend(envelope, request);
-
- send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(
- send, getSendTimeout(), new JmsSendTimedOutException("Timed out waiting for disposition of sent Message", envelope.getMessage()));
-
- // Update context so the incoming disposition can cancel any pending timeout
- delivery.setContext(send);
+ send.setDelivery(delivery);
+ delivery.setContext(send);
+
+ // Put it on the wire and let it fail if the connection is broken, if it does
+ // get written then continue on to determine when we should complete it.
+ if (provider.pumpToProtonTransport(request)) {
+ // For presettled messages we can just mark as successful and we are done, but
+ // for any other message we still track it until the remote settles. If the send
+ // was tagged as asynchronous we must mark the original request as complete but
+ // we still need to wait for the disposition before we can consider the send as
+ // having been successful.
+ if (presettle) {
+ send.onSuccess();
+ } else if (envelope.isSendAsync()) {
+ send.getOriginalRequest().onSuccess();
+ }
}
}
@@ -195,13 +211,16 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
public void processFlowUpdates(AmqpProvider provider) throws IOException {
if (!blocked.isEmpty() && getEndpoint().getCredit() > 0) {
- while (getEndpoint().getCredit() > 0 && !blocked.isEmpty()) {
+ Iterator<InFlightSend> blockedSends = blocked.values().iterator();
+ while (getEndpoint().getCredit() > 0 && blockedSends.hasNext()) {
LOG.trace("Dispatching previously held send");
- InFlightSend held = blocked.pop();
+ InFlightSend held = blockedSends.next();
try {
- doSend(held.envelope, held);
+ doSend(held.getEnvelope(), held);
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
+ } finally {
+ blockedSends.remove();
}
}
}
@@ -211,25 +230,18 @@ public class AmqpFixedProducer extends AmqpProducer {
getEndpoint().drained();
}
- // Once the pending sends queue is drained we can propagate the close request.
- if (blocked.isEmpty() && isAwaitingClose() && !isClosed()) {
- super.close(closeRequest);
- }
-
super.processFlowUpdates(provider);
}
@Override
- public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
- List<Delivery> toRemove = new ArrayList<Delivery>();
-
- for (Delivery delivery : sent) {
- DeliveryState state = delivery.getRemoteState();
- if (state == null) {
- continue;
- }
+ public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
+ DeliveryState state = delivery.getRemoteState();
+ if (state != null) {
+ InFlightSend send = (InFlightSend) delivery.getContext();
+ Exception deliveryError = null;
Outcome outcome = null;
+
if (state instanceof TransactionalState) {
LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
outcome = ((TransactionalState) state).getOutcome();
@@ -240,14 +252,9 @@ public class AmqpFixedProducer extends AmqpProducer {
outcome = null;
}
- AsyncResult request = (AsyncResult) delivery.getContext();
- Exception deliveryError = null;
-
if (outcome instanceof Accepted) {
LOG.trace("Outcome of delivery was accepted: {}", delivery);
- if (request != null && !request.isComplete()) {
- request.onSuccess();
- }
+ send.onSuccess();
} else if (outcome instanceof Rejected) {
LOG.trace("Outcome of delivery was rejected: {}", delivery);
ErrorCondition remoteError = ((Rejected) outcome).getError();
@@ -265,21 +272,11 @@ public class AmqpFixedProducer extends AmqpProducer {
}
if (deliveryError != null) {
- if (request != null && !request.isComplete()) {
- request.onFailure(deliveryError);
- } else {
- connection.getProvider().fireNonFatalProviderException(deliveryError);
- }
+ send.onFailure(deliveryError);
}
-
- tagGenerator.returnTag(delivery.getTag());
- toRemove.add(delivery);
- delivery.settle();
}
- sent.removeAll(toRemove);
-
- super.processDeliveryUpdates(provider);
+ super.processDeliveryUpdates(provider, delivery);
}
public AmqpSession getSession() {
@@ -312,45 +309,45 @@ public class AmqpFixedProducer extends AmqpProducer {
error = new JMSException("Producer closed remotely before message transfer result was notified");
}
- for (Delivery delivery : sent) {
+ Collection<InFlightSend> inflightSends = new ArrayList<InFlightSend>(sent.values());
+ for (InFlightSend send : inflightSends) {
try {
- AsyncResult request = (AsyncResult) delivery.getContext();
-
- if (request != null && !request.isComplete()) {
- request.onFailure(error);
- }
-
- delivery.settle();
- tagGenerator.returnTag(delivery.getTag());
+ send.onFailure(error);
} catch (Exception e) {
- LOG.debug("Caught exception when failing pending send during remote producer closure: {}", delivery, e);
+ LOG.debug("Caught exception when failing pending send during remote producer closure: {}", send, e);
}
}
- sent.clear();
-
- for (InFlightSend blockedSend : blocked) {
+ Collection<InFlightSend> blockedSends = new ArrayList<InFlightSend>(blocked.values());
+ for (InFlightSend send : blockedSends) {
try {
- AsyncResult request = blockedSend.request;
- if (request != null && !request.isComplete()) {
- request.onFailure(error);
- }
+ send.onFailure(error);
} catch (Exception e) {
- LOG.debug("Caught exception when failing blocked send during remote producer closure: {}", blockedSend, e);
+ LOG.debug("Caught exception when failing blocked send during remote producer closure: {}", send, e);
}
}
+ }
- blocked.clear();
+ @Override
+ public void addSendCompletionWatcher(AsyncResult watcher) {
+ // If none pending signal done already.
+ // TODO - If we don't include blocked sends then update this.
+ if (blocked.isEmpty() && sent.isEmpty()) {
+ watcher.onSuccess();
+ } else {
+ this.sendCompletionWatcher = watcher;
+ }
}
//----- Class used to manage held sends ----------------------------------//
- private class InFlightSend implements AsyncResult {
+ private class InFlightSend implements AsyncResult, AmqpExceptionBuilder {
- public final JmsOutboundMessageDispatch envelope;
- public final AsyncResult request;
+ private final JmsOutboundMessageDispatch envelope;
+ private final AsyncResult request;
- public ScheduledFuture<?> requestTimeout;
+ private Delivery delivery;
+ private ScheduledFuture<?> requestTimeout;
public InFlightSend(JmsOutboundMessageDispatch envelope, AsyncResult request) {
this.envelope = envelope;
@@ -359,31 +356,95 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
public void onFailure(Throwable cause) {
- if (requestTimeout != null) {
- requestTimeout.cancel(false);
- requestTimeout = null;
- }
-
- blocked.remove(this);
+ handleSendCompletion(false);
- request.onFailure(cause);
+ if (request.isComplete()) {
+ // Asynchronous sends can still be awaiting a completion in which case we
+ // send to them otherwise send to the listener to be reported.
+ if (envelope.isCompletionRequired()) {
+ getParent().getProvider().getProviderListener().onFailedMessageSend(envelope, cause);
+ } else {
+ getParent().getProvider().fireNonFatalProviderException(IOExceptionSupport.create(cause));
+ }
+ } else {
+ request.onFailure(cause);
+ }
}
@Override
public void onSuccess() {
- if (requestTimeout != null) {
- requestTimeout.cancel(false);
- requestTimeout = null;
+ handleSendCompletion(true);
+
+ if (!request.isComplete()) {
+ request.onSuccess();
}
- blocked.remove(this);
+ if (envelope.isCompletionRequired()) {
+ getParent().getProvider().getProviderListener().onCompletedMessageSend(envelope);
+ }
+ }
- request.onSuccess();
+ public void setRequestTimeout(ScheduledFuture<?> requestTimeout) {
+ if (this.requestTimeout != null) {
+ this.requestTimeout.cancel(false);
+ }
+
+ this.requestTimeout = requestTimeout;
+ }
+
+ public JmsOutboundMessageDispatch getEnvelope() {
+ return envelope;
+ }
+
+ public AsyncResult getOriginalRequest() {
+ return request;
+ }
+
+ public void setDelivery(Delivery delivery) {
+ this.delivery = delivery;
+ }
+
+ public Delivery getDelivery() {
+ return delivery;
}
@Override
public boolean isComplete() {
return request.isComplete();
}
+
+ private void handleSendCompletion(boolean successful) {
+ setRequestTimeout(null);
+
+ if (getDelivery() != null) {
+ sent.remove(envelope.getMessageId());
+ delivery.settle();
+ tagGenerator.returnTag(delivery.getTag());
+ } else {
+ blocked.remove(envelope.getMessageId());
+ }
+
+ // TODO - Should this take blocked sends into consideration.
+ // Signal the watcher that all pending sends have completed if one is registered
+ // and both the in-flight sends and blocked sends have completed.
+ if (sendCompletionWatcher != null && sent.isEmpty() && blocked.isEmpty()) {
+ sendCompletionWatcher.onSuccess();
+ }
+
+ // Once the pending sends queue is drained and all in-flight sends have been
+ // settled we can propagate the close request.
+ if (isAwaitingClose() && !isClosed() && blocked.isEmpty() && sent.isEmpty()) {
+ AmqpFixedProducer.super.close(closeRequest);
+ }
+ }
+
+ @Override
+ public Exception createException() {
+ if (delivery == null) {
+ return new JmsSendTimedOutException("Timed out waiting for credit to send Message", envelope.getMessage());
+ } else {
+ return new JmsSendTimedOutException("Timed out waiting for disposition of sent Message", envelope.getMessage());
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
index 74fe457..3ace6d2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
@@ -54,13 +54,10 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
* @param request
* The AsyncRequest that will be notified on send success or failure.
*
- * @return true if the producer had credit to send or false if there was no available
- * credit and the send needed to be deferred.
- *
* @throws IOException if an error occurs sending the message
* @throws JMSException if an error occurs while preparing the message for send.
*/
- public abstract boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException;
+ public abstract void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException;
/**
* @return true if this is an anonymous producer or false if fixed to a given destination.
@@ -92,4 +89,14 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo,
public void setPresettle(boolean presettle) {
this.presettle = presettle;
}
+
+ /**
+ * Allows a completion request to be added to this producer that will be notified
+ * once all outstanding sends have completed.
+ *
+ * @param watcher
+ * The AsyncResult that will be signaled once this producer has no pending sends.
+ */
+ public abstract void addSendCompletionWatcher(AsyncResult watcher);
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 3bfe099..ca8a0e1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -59,6 +59,7 @@ import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Event.Type;
@@ -477,11 +478,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
producer = session.getProducer(producerId);
}
- boolean couldSend = producer.send(envelope, request);
- pumpToProtonTransport(request);
- if (couldSend && envelope.isSendAsync()) {
- request.onSuccess();
- }
+ producer.send(envelope, request);
} catch (Throwable t) {
request.onFailure(t);
}
@@ -816,7 +813,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
case DELIVERY:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
if (amqpEventSink != null) {
- amqpEventSink.processDeliveryUpdates(this);
+ amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext());
}
break;
default:
@@ -1175,6 +1172,36 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
return null;
}
+ /**
+ * Allows a resource to request that its parent resource schedule a future
+ * cancellation of a request and return it a {@link Future} instance that
+ * can be used to cancel the scheduled automatic failure of the request.
+ *
+ * @param request
+ * The request that should be marked as failed based on configuration.
+ * @param timeout
+ * The time to wait before marking the request as failed.
+ * @param builder
+ * An AmqpExceptionBuilder to use when creating a timed out exception.
+ *
+ * @return a {@link ScheduledFuture} that can be stored by the caller.
+ */
+ public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, long timeout, final AmqpExceptionBuilder builder) {
+ if (timeout != JmsConnectionInfo.INFINITE) {
+ return serializer.schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ request.onFailure(builder.createException());
+ pumpToProtonTransport();
+ }
+
+ }, timeout, TimeUnit.MILLISECONDS);
+ }
+
+ return null;
+ }
+
//----- Internal implementation ------------------------------------------//
private void checkClosed() throws ProviderClosedException {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 577c128..2d08bc7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -44,6 +44,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
private final AmqpSession session;
private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>();
+ private final Set<AmqpProducer> txProducers = new LinkedHashSet<AmqpProducer>();
private JmsTransactionId current;
private AmqpTransactionCoordinator coordinator;
@@ -85,7 +86,6 @@ public class AmqpTransactionContext implements AmqpResourceParent {
}
};
-
if (coordinator == null || coordinator.isClosed()) {
AmqpTransactionCoordinatorBuilder builder =
new AmqpTransactionCoordinatorBuilder(this, session.getResourceInfo());
@@ -115,7 +115,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
}
}
- public void commit(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception {
+ public void commit(final JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception {
if (!transactionInfo.getId().equals(current)) {
if (!transactionInfo.isInDoubt() && current == null) {
throw new IllegalStateException("Commit called with no active Transaction.");
@@ -128,29 +128,10 @@ public class AmqpTransactionContext implements AmqpResourceParent {
preCommit();
- LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
- coordinator.discharge(current, new AsyncResult() {
-
- @Override
- public void onSuccess() {
- current = null;
- postCommit();
- request.onSuccess();
- }
-
- @Override
- public void onFailure(Throwable result) {
- current = null;
- postCommit();
- request.onFailure(result);
- }
-
- @Override
- public boolean isComplete() {
- return current == null;
- }
+ DischargeCompletion dischargeResult = new DischargeCompletion(request, true);
- }, true);
+ LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
+ coordinator.discharge(current, dischargeResult, true);
}
public void rollback(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception {
@@ -167,29 +148,17 @@ public class AmqpTransactionContext implements AmqpResourceParent {
preRollback();
- LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
- coordinator.discharge(current, new AsyncResult() {
-
- @Override
- public void onSuccess() {
- current = null;
- postRollback();
- request.onSuccess();
- }
-
- @Override
- public void onFailure(Throwable result) {
- current = null;
- postRollback();
- request.onFailure(result);
- }
+ DischargeCompletion dischargeResult = new DischargeCompletion(request, false);
- @Override
- public boolean isComplete() {
- return current == null;
+ if (txProducers.isEmpty()) {
+ LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
+ coordinator.discharge(current, dischargeResult, false);
+ } else {
+ SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), false);
+ for (AmqpProducer producer : txProducers) {
+ producer.addSendCompletionWatcher(producersSendCompletion);
}
-
- }, false);
+ }
}
//----- Context utility methods ------------------------------------------//
@@ -198,6 +167,10 @@ public class AmqpTransactionContext implements AmqpResourceParent {
txConsumers.add(consumer);
}
+ public void registerTxProducer(AmqpProducer producer) {
+ txProducers.add(producer);
+ }
+
public AmqpSession getSession() {
return session;
}
@@ -243,6 +216,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
}
txConsumers.clear();
+ txProducers.clear();
}
private void postRollback() {
@@ -251,6 +225,7 @@ public class AmqpTransactionContext implements AmqpResourceParent {
}
txConsumers.clear();
+ txProducers.clear();
}
//----- Resource Parent implementation -----------------------------------//
@@ -273,4 +248,94 @@ public class AmqpTransactionContext implements AmqpResourceParent {
public AmqpProvider getProvider() {
return session.getProvider();
}
+
+ //----- Completion for Commit or Rollback operation ----------------------//
+
+ private class DischargeCompletion implements AsyncResult {
+
+ private final AsyncResult request;
+ private final boolean commit;
+
+ public DischargeCompletion(AsyncResult request, boolean commit) {
+ this.request = request;
+ this.commit = commit;
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ cleanup();
+ request.onFailure(result);
+ }
+
+ @Override
+ public void onSuccess() {
+ cleanup();
+ request.onSuccess();
+ }
+
+ @Override
+ public boolean isComplete() {
+ return request.isComplete();
+ }
+
+ private void cleanup() {
+ current = null;
+ if (commit) {
+ postCommit();
+ } else {
+ postRollback();
+ }
+ }
+ }
+
+ //----- Completion result for Producers ----------------------------------//
+
+ @SuppressWarnings("unused")
+ private class SendCompletion implements AsyncResult {
+
+ private int pendingCompletions;
+
+ private final JmsTransactionInfo info;
+ private final DischargeCompletion request;
+
+ private boolean commit;
+
+ public SendCompletion(JmsTransactionInfo info, DischargeCompletion request, int pendingCompletions, boolean commit) {
+ this.info = info;
+ this.request = request;
+ this.pendingCompletions = pendingCompletions;
+ this.commit = commit;
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ if (--pendingCompletions == 0) {
+ try {
+ LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
+ coordinator.discharge(current, request, false);
+ } catch (Throwable error) {
+ request.onFailure(error);
+ }
+ } else {
+ commit = false;
+ }
+ }
+
+ @Override
+ public void onSuccess() {
+ if (--pendingCompletions == 0) {
+ try {
+ LOG.trace("TX Context[{}] {} current TX[[]]", this, commit ? "committing" : "rolling back" ,current);
+ coordinator.discharge(current, request, commit);
+ } catch (Throwable error) {
+ request.onFailure(error);
+ }
+ }
+ }
+
+ @Override
+ public boolean isComplete() {
+ return request.isComplete();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org