You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/17 00:05:56 UTC
[2/2] qpid-jms git commit: QPIDJMS-126 Adds support for dealing with
the coordinator link being closed either during tx operations or on commit /
rollback. Handling of message acknowledge dropping during link downtime is
not implemented yet.
QPIDJMS-126 Adds support for dealing with the coordinator link being
closed either during tx operations or on commit / rollback. Handling of
message acknowledge dropping during link downtime is not implemented
yet.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/974a8510
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/974a8510
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/974a8510
Branch: refs/heads/master
Commit: 974a8510bf95d48fe9174c25ad178198145771e1
Parents: 69d2273
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 16 18:05:45 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 16 18:05:45 2015 -0400
----------------------------------------------------------------------
.../qpid/jms/JmsLocalTransactionContext.java | 3 -
.../qpid/jms/meta/JmsAbstractResourceId.java | 11 +
.../org/apache/qpid/jms/meta/JmsResourceId.java | 16 ++
.../jms/provider/amqp/AmqpAbstractResource.java | 16 +-
.../qpid/jms/provider/amqp/AmqpConsumer.java | 10 +-
.../jms/provider/amqp/AmqpFixedProducer.java | 7 +
.../qpid/jms/provider/amqp/AmqpSession.java | 14 +-
.../qpid/jms/provider/amqp/AmqpSupport.java | 4 +
.../provider/amqp/AmqpTransactionContext.java | 248 ++++++++--------
.../amqp/AmqpTransactionCoordinator.java | 209 ++++++++++++++
.../amqp/builders/AmqpConnectionBuilder.java | 2 +-
.../amqp/builders/AmqpSessionBuilder.java | 30 --
.../builders/AmqpTransactionContextBuilder.java | 66 -----
.../AmqpTransactionCoordinatorBuilder.java | 66 +++++
.../integration/ConnectionIntegrationTest.java | 8 +-
.../QueueBrowserIntegrationTest.java | 4 +-
.../jms/integration/SessionIntegrationTest.java | 4 +-
.../TransactionsIntegrationTest.java | 280 ++++++++++---------
.../failover/FailoverIntegrationTest.java | 23 +-
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 131 +++++++++
20 files changed, 745 insertions(+), 407 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index cf5043e..40a4ad1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -284,7 +284,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
// work from getting queued on the provider needlessly.
lock.writeLock().tryLock();
try {
- LOG.info("onConnectionRecovery starting new transaction to replace old one.");
// On connection recover we open a new TX to replace the existing one.
transactionId = connection.getNextTransactionId();
@@ -293,8 +292,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
provider.create(transaction, request);
request.sync();
- LOG.info("onConnectionRecovery started new transaction to replace old one.");
-
// It is ok to use the newly created TX from here if the TX never had any
// work done within it otherwise we want the next commit to fail.
failed = hasWork;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResourceId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResourceId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResourceId.java
index 549e4c9..3f891e2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResourceId.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsAbstractResourceId.java
@@ -22,6 +22,7 @@ package org.apache.qpid.jms.meta;
public abstract class JmsAbstractResourceId implements JmsResourceId {
protected transient Object providerHint;
+ protected transient Object providerContext;
protected transient int hashCode;
@Override
@@ -33,4 +34,14 @@ public abstract class JmsAbstractResourceId implements JmsResourceId {
public Object getProviderHint() {
return providerHint;
}
+
+ @Override
+ public void setProviderContext(Object context) {
+ this.providerContext = context;
+ }
+
+ @Override
+ public Object getProviderContext() {
+ return providerContext;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
index 5dd4361..d6de5fc 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
@@ -37,4 +37,20 @@ public interface JmsResourceId {
*/
Object getProviderHint();
+ /**
+ * Allows a Provider to embed a bit of Context for later use. The context
+ * can be some state data needed between asynchronous requests etc.
+ *
+ * @param value
+ * The value to add as context for this Id.
+ */
+ void setProviderContext(Object value);
+
+ /**
+ * Return the previously stored Provider context object.
+ *
+ * @return the previously stored Provider context object.
+ */
+ Object getProviderContext();
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 5474d75..fe17a63 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -105,9 +105,9 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
getEndpoint().free();
getEndpoint().setContext(null);
- if (this.closeRequest != null) {
- this.closeRequest.onSuccess();
- this.closeRequest = null;
+ if (closeRequest != null) {
+ closeRequest.onSuccess();
+ closeRequest = null;
}
}
@@ -145,11 +145,15 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
//----- Access methods ---------------------------------------------------//
public E getEndpoint() {
- return this.endpoint;
+ return endpoint;
}
public R getResourceInfo() {
- return this.resourceInfo;
+ return resourceInfo;
+ }
+
+ public AmqpResourceParent getParent() {
+ return parent;
}
//----- Endpoint state access methods ------------------------------------//
@@ -163,7 +167,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
}
public boolean isAwaitingClose() {
- return this.closeRequest != null;
+ return closeRequest != null;
}
public EndpointState getLocalState() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index d2df866..9d95fa4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -523,16 +523,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
public void preRollback() {
}
- /**
- * @throws Exception if an error occurs while performing this action.
- */
- public void postCommit() throws Exception {
+ public void postCommit() {
}
- /**
- * @throws Exception if an error occurs while performing this action.
- */
- public void postRollback() throws Exception {
+ public void postRollback() {
}
//----- Inner classes used in message pull operations --------------------//
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 47cee33..e2dbc39 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -100,6 +100,13 @@ public class AmqpFixedProducer extends AmqpProducer {
}
private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+ // If the transaction has failed due to remote termination etc then we just indicate
+ // the send has succeeded until the a new transaction is started.
+ if (session.isTransacted() && session.isTransactionFailed()) {
+ request.onSuccess();
+ return;
+ }
+
JmsMessageFacade facade = envelope.getMessage().getFacade();
LOG.trace("Producer sending message: {}", envelope);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 28fe3ea..8961258 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -42,7 +42,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
private final AmqpConnection connection;
- private AmqpTransactionContext txContext;
+ private final AmqpTransactionContext txContext;
private final Map<JmsConsumerId, AmqpConsumer> consumers = new HashMap<JmsConsumerId, AmqpConsumer>();
@@ -50,6 +50,12 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
super(info, session, connection);
this.connection = connection;
+
+ if (info.isTransacted()) {
+ txContext = new AmqpTransactionContext(this, info);
+ } else {
+ txContext = null;
+ }
}
/**
@@ -198,8 +204,6 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
if (resource instanceof AmqpConsumer) {
AmqpConsumer consumer = (AmqpConsumer) resource;
consumers.put(consumer.getConsumerId(), consumer);
- } else if (resource instanceof AmqpTransactionContext) {
- txContext = (AmqpTransactionContext) resource;
} else {
connection.addChildResource(resource);
}
@@ -262,6 +266,10 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
return getResourceInfo().isTransacted();
}
+ public boolean isTransactionFailed() {
+ return txContext == null ? false : txContext.isTransactionFailed();
+ }
+
boolean isAsyncAck() {
return getResourceInfo().isSendAcksAsync() || isTransacted();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
index 624a26b..1ec2bfa 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
@@ -23,10 +23,12 @@ import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
+import javax.jms.TransactionRolledBackException;
import org.apache.qpid.jms.provider.ProviderRedirectedException;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.transaction.TransactionErrors;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ConnectionError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -98,6 +100,8 @@ public class AmqpSupport {
remoteError = new JMSSecurityException(message);
} else if (error.equals(AmqpError.NOT_FOUND)) {
remoteError = new InvalidDestinationException(message);
+ } else if (error.equals(TransactionErrors.TRANSACTION_ROLLBACK)) {
+ remoteError = new TransactionRolledBackException(message);
} else if (error.equals(ConnectionError.REDIRECT)) {
remoteError = createRedirectException(error, message, errorCondition);
} else if (error.equals(AmqpError.INVALID_FIELD)) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 648b896..eb3447c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -17,27 +17,16 @@
package org.apache.qpid.jms.provider.amqp;
import java.io.IOException;
-import java.nio.BufferOverflowException;
import java.util.LinkedHashSet;
import java.util.Set;
import javax.jms.IllegalStateException;
-import javax.jms.TransactionRolledBackException;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.provider.AsyncResult;
-import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.jms.provider.amqp.builders.AmqpTransactionCoordinatorBuilder;
import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.transaction.Declare;
-import org.apache.qpid.proton.amqp.transaction.Declared;
-import org.apache.qpid.proton.amqp.transaction.Discharge;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,22 +36,15 @@ import org.slf4j.LoggerFactory;
* The Transaction will carry a JmsTransactionId while the Transaction is open, once a
* transaction has been committed or rolled back the Transaction Id is cleared.
*/
-public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, Sender> {
+public class AmqpTransactionContext implements AmqpResourceParent {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
- private static final Boolean ROLLBACK_MARKER = Boolean.FALSE;
- private static final Boolean COMMIT_MARKER = Boolean.TRUE;
-
- private final byte[] OUTBOUND_BUFFER = new byte[64];
-
private final AmqpSession session;
- private JmsTransactionId current;
- private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator();
private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>();
- private Delivery pendingDelivery;
- private AsyncResult pendingRequest;
+ private JmsTransactionId current;
+ private AmqpTransactionCoordinator coordinator;
/**
* Creates a new AmqpTransactionContext instance.
@@ -71,131 +53,146 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
* The session that owns this transaction context.
* @param resourceInfo
* The resourceInfo that defines this transaction context.
- * @param sender
- * The local sender endpoint for this transaction context.
*/
- public AmqpTransactionContext(AmqpSession session, JmsSessionInfo resourceInfo, Sender sender) {
- super(resourceInfo, sender, session);
-
+ public AmqpTransactionContext(AmqpSession session, JmsSessionInfo resourceInfo) {
this.session = session;
}
- @Override
- public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
- try {
- if (pendingDelivery != null && pendingDelivery.remotelySettled()) {
- DeliveryState state = pendingDelivery.getRemoteState();
- if (state instanceof Declared) {
- Declared declared = (Declared) state;
- current.setProviderHint(declared.getTxnId());
- pendingDelivery.settle();
- LOG.debug("New TX started: {}", current.getProviderHint());
- AsyncResult request = this.pendingRequest;
- this.pendingRequest = null;
- this.pendingDelivery = null;
- request.onSuccess();
- } else if (state instanceof Rejected) {
- LOG.debug("Last TX request failed: {}", current.getProviderHint());
- pendingDelivery.settle();
- Rejected rejected = (Rejected) state;
- Exception cause = AmqpSupport.convertToException(rejected.getError());
- TransactionRolledBackException ex = new TransactionRolledBackException(cause.getMessage());
- AsyncResult request = this.pendingRequest;
- this.current = null;
- this.pendingRequest = null;
- this.pendingDelivery = null;
- postRollback();
- request.onFailure(ex);
- } else {
- LOG.debug("Last TX request succeeded: {}", current.getProviderHint());
- pendingDelivery.settle();
- AsyncResult request = this.pendingRequest;
- if (pendingDelivery.getContext() != null) {
- if (pendingDelivery.getContext().equals(COMMIT_MARKER)) {
- postCommit();
- } else {
- postRollback();
- }
- }
- this.current = null;
- this.pendingRequest = null;
- this.pendingDelivery = null;
- request.onSuccess();
- }
- }
-
- super.processDeliveryUpdates(provider);
- } catch (Exception e) {
- throw IOExceptionSupport.create(e);
- }
- }
-
- public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
+ public void begin(final JmsTransactionId txId, final AsyncResult request) throws Exception {
if (current != null) {
throw new IOException("Begin called while a TX is still Active.");
}
- Message message = Message.Factory.create();
- Declare declare = new Declare();
- message.setBody(new AmqpValue(declare));
+ final AsyncResult declareCompletion = new AsyncResult() {
+
+ @Override
+ public void onSuccess() {
+ current = txId;
+ request.onSuccess();
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ current = null;
+ request.onFailure(result);
+ }
+
+ @Override
+ public boolean isComplete() {
+ return current != null;
+ }
+ };
+
+
+ if (coordinator == null || coordinator.isClosed()) {
+ AmqpTransactionCoordinatorBuilder builder =
+ new AmqpTransactionCoordinatorBuilder(this, session.getResourceInfo());
+ builder.buildResource(new AsyncResult() {
+
+ @Override
+ public void onSuccess() {
+ try {
+ coordinator.declare(txId, declareCompletion);
+ } catch (Exception e) {
+ request.onFailure(e);
+ }
+ }
- pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
- pendingRequest = request;
- current = txId;
+ @Override
+ public void onFailure(Throwable result) {
+ request.onFailure(result);
+ }
- sendTxCommand(message);
+ @Override
+ public boolean isComplete() {
+ return request.isComplete();
+ }
+ });
+ } else {
+ coordinator.declare(txId, declareCompletion);
+ }
}
- public void commit(AsyncResult request) throws Exception {
+ public void commit(final AsyncResult request) throws Exception {
if (current == null) {
throw new IllegalStateException("Commit called with no active Transaction.");
}
preCommit();
- Message message = Message.Factory.create();
- Discharge discharge = new Discharge();
- discharge.setFail(false);
- discharge.setTxnId((Binary) current.getProviderHint());
- message.setBody(new AmqpValue(discharge));
+ LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
+ coordinator.discharge(current, new AsyncResult() {
+
+ @Override
+ public void onSuccess() {
+ current = null;
+ postCommit();
+ request.onSuccess();
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ current = null;
+ postCommit();
+ request.onFailure(result);
+ }
- pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
- pendingDelivery.setContext(COMMIT_MARKER);
- pendingRequest = request;
+ @Override
+ public boolean isComplete() {
+ return current == null;
+ }
- sendTxCommand(message);
+ }, true);
}
- public void rollback(AsyncResult request) throws Exception {
+ public void rollback(final AsyncResult request) throws Exception {
if (current == null) {
throw new IllegalStateException("Rollback called with no active Transaction.");
}
preRollback();
- Message message = Message.Factory.create();
- Discharge discharge = new Discharge();
- discharge.setFail(true);
- discharge.setTxnId((Binary) current.getProviderHint());
- message.setBody(new AmqpValue(discharge));
+ LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
+ coordinator.discharge(current, new AsyncResult() {
- pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
- pendingDelivery.setContext(ROLLBACK_MARKER);
- pendingRequest = request;
+ @Override
+ public void onSuccess() {
+ current = null;
+ postRollback();
+ request.onSuccess();
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ current = null;
+ postRollback();
+ request.onFailure(result);
+ }
- sendTxCommand(message);
+ @Override
+ public boolean isComplete() {
+ return current == null;
+ }
+
+ }, false);
}
+ //----- Context utility methods ------------------------------------------//
+
public void registerTxConsumer(AmqpConsumer consumer) {
- this.txConsumers.add(consumer);
+ txConsumers.add(consumer);
}
public AmqpSession getSession() {
- return this.session;
+ return session;
}
public JmsTransactionId getTransactionId() {
- return this.current;
+ return current;
+ }
+
+ public boolean isTransactionFailed() {
+ return coordinator == null ? false : coordinator.isClosed();
}
public Binary getAmqpTransactionId() {
@@ -211,19 +208,21 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
return this.session.getSessionId() + ": txContext";
}
- private void preCommit() throws Exception {
+ //----- Transaction pre / post completion --------------------------------//
+
+ private void preCommit() {
for (AmqpConsumer consumer : txConsumers) {
consumer.preCommit();
}
}
- private void preRollback() throws Exception {
+ private void preRollback() {
for (AmqpConsumer consumer : txConsumers) {
consumer.preRollback();
}
}
- private void postCommit() throws Exception {
+ private void postCommit() {
for (AmqpConsumer consumer : txConsumers) {
consumer.postCommit();
}
@@ -231,7 +230,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
txConsumers.clear();
}
- private void postRollback() throws Exception {
+ private void postRollback() {
for (AmqpConsumer consumer : txConsumers) {
consumer.postRollback();
}
@@ -239,20 +238,19 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo,
txConsumers.clear();
}
- private void sendTxCommand(Message message) throws IOException {
- int encodedSize = 0;
- byte[] buffer = OUTBOUND_BUFFER;
- while (true) {
- try {
- encodedSize = message.encode(buffer, 0, buffer.length);
- break;
- } catch (BufferOverflowException e) {
- buffer = new byte[buffer.length * 2];
- }
+ //----- Resource Parent event handlers -----------------------------------//
+
+ @Override
+ public void addChildResource(AmqpResource resource) {
+ if (resource instanceof AmqpTransactionCoordinator) {
+ coordinator = (AmqpTransactionCoordinator) resource;
}
+ }
- Sender sender = getEndpoint();
- sender.send(buffer, 0, encodedSize);
- sender.advance();
+ @Override
+ public void removeChildResource(AmqpResource resource) {
+ // We don't clear the coordinator link so that we can refer to it
+ // to check if the current TX has failed due to link closed during
+ // normal operations.
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
new file mode 100644
index 0000000..12e824a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the AMQP Transaction coordinator link used by the transaction context
+ * of a session to control the lifetime of a given transaction.
+ */
+public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionInfo, Sender> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
+
+ private static final Boolean ROLLBACK_MARKER = Boolean.FALSE;
+ private static final Boolean COMMIT_MARKER = Boolean.TRUE;
+
+ private final byte[] OUTBOUND_BUFFER = new byte[64];
+
+ private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator();
+
+ private Delivery pendingDelivery;
+ private AsyncResult pendingRequest;
+
+ public AmqpTransactionCoordinator(JmsSessionInfo resourceInfo, Sender endpoint, AmqpResourceParent parent) {
+ super(resourceInfo, endpoint, parent);
+ }
+
+ @Override
+ public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
+ try {
+ if (pendingDelivery != null && pendingDelivery.remotelySettled()) {
+ DeliveryState state = pendingDelivery.getRemoteState();
+ JmsTransactionId txId = (JmsTransactionId) pendingDelivery.getContext();
+ if (state instanceof Declared) {
+ LOG.debug("New TX started: {}", txId);
+ Declared declared = (Declared) state;
+ txId.setProviderHint(declared.getTxnId());
+ pendingRequest.onSuccess();
+ } else if (state instanceof Rejected) {
+ LOG.debug("Last TX request failed: {}", txId);
+ Rejected rejected = (Rejected) state;
+ Exception cause = AmqpSupport.convertToException(rejected.getError());
+ JMSException failureCause = null;
+ if (txId.getProviderContext() == COMMIT_MARKER) {
+ failureCause = new TransactionRolledBackException(cause.getMessage());
+ } else {
+ failureCause = new JMSException(cause.getMessage());
+ }
+
+ pendingRequest.onFailure(failureCause);
+ } else {
+ LOG.debug("Last TX request succeeded: {}", txId);
+ pendingRequest.onSuccess();
+ }
+
+ // Reset state for next TX action.
+ pendingDelivery.settle();
+ pendingRequest = null;
+ pendingDelivery = null;
+ }
+
+ super.processDeliveryUpdates(provider);
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ public void declare(JmsTransactionId txId, AsyncResult request) throws Exception {
+ if (txId.getProviderHint() != null) {
+ throw new IllegalStateException("Declar called while a TX is still Active.");
+ }
+
+ if (isClosed()) {
+ request.onFailure(new JMSException("Cannot start new transaction: Coordinator remotely closed"));
+ return;
+ }
+
+ Message message = Message.Factory.create();
+ Declare declare = new Declare();
+ message.setBody(new AmqpValue(declare));
+
+ pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
+ pendingDelivery.setContext(txId);
+ pendingRequest = request;
+
+ sendTxCommand(message);
+ }
+
+ public void discharge(JmsTransactionId txId, AsyncResult request, boolean commit) throws Exception {
+ if (txId.getProviderHint() == null) {
+ throw new IllegalStateException("Discharge called with no active Transaction.");
+ }
+
+ if (isClosed()) {
+ Exception failureCause = null;
+
+ if (commit) {
+ failureCause = new TransactionRolledBackException("Transaction inbout: Coordinator remotely closed");
+ } else {
+ failureCause = new JMSException("Rollback cannot complete: Coordinator remotely closed");
+ }
+
+ request.onFailure(failureCause);
+ return;
+ }
+
+ // Store the context of this action in the transaction ID for later completion.
+ txId.setProviderContext(commit ? COMMIT_MARKER : ROLLBACK_MARKER);
+
+ Message message = Message.Factory.create();
+ Discharge discharge = new Discharge();
+ discharge.setFail(!commit);
+ discharge.setTxnId((Binary) txId.getProviderHint());
+ message.setBody(new AmqpValue(discharge));
+
+ pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
+ pendingDelivery.setContext(txId);
+ pendingRequest = request;
+
+ sendTxCommand(message);
+ }
+
+ //----- Base class overrides ---------------------------------------------//
+
+ @Override
+ public void remotelyClosed(AmqpProvider provider) {
+
+ Exception txnError = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
+
+ // Alert any pending operation that the link failed to complete the pending
+ // begin / commit / rollback operation.
+ if (pendingRequest != null) {
+ pendingRequest.onFailure(txnError);
+ pendingRequest = null;
+ }
+
+ // Override the base class version because we do not want to propagate
+ // an error up to the client if remote close happens as that is an
+ // acceptable way for the remote to indicate the discharge could not
+ // be applied.
+
+ if (getParent() != null) {
+ getParent().removeChildResource(this);
+ }
+
+ if (getEndpoint() != null) {
+ getEndpoint().close();
+ getEndpoint().free();
+ }
+
+ LOG.debug("Transaction Coordinator link {} was remotely closed", getResourceInfo());
+ }
+
+ //----- Internal implementation ------------------------------------------//
+
+ private void sendTxCommand(Message message) throws IOException {
+ int encodedSize = 0;
+ byte[] buffer = OUTBOUND_BUFFER;
+ while (true) {
+ try {
+ encodedSize = message.encode(buffer, 0, buffer.length);
+ break;
+ } catch (BufferOverflowException e) {
+ buffer = new byte[buffer.length * 2];
+ }
+ }
+
+ Sender sender = getEndpoint();
+ sender.send(buffer, 0, encodedSize);
+ sender.advance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index f4c290f..cad66a3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -37,7 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Resource builder responsible for creating and opening an AmqpConnectionSession instance.
+ * Resource builder responsible for creating and opening an AmqpConnection instance.
*/
public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, AmqpProvider, JmsConnectionInfo, Connection> {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
index d49f64c..6cb9819 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpSessionBuilder.java
@@ -17,7 +17,6 @@
package org.apache.qpid.jms.provider.amqp.builders;
import org.apache.qpid.jms.meta.JmsSessionInfo;
-import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.proton.engine.Session;
@@ -32,35 +31,6 @@ public class AmqpSessionBuilder extends AmqpResourceBuilder<AmqpSession, AmqpCon
}
@Override
- public void buildResource(final AsyncResult request) {
-
- AsyncResult opened = request;
-
- if (getResourceInfo().isTransacted()) {
- opened = new AsyncResult() {
-
- @Override
- public void onSuccess() {
- AmqpTransactionContextBuilder builder = new AmqpTransactionContextBuilder(getResource(), getResourceInfo());
- builder.buildResource(request);
- }
-
- @Override
- public void onFailure(Throwable result) {
- request.onFailure(result);
- }
-
- @Override
- public boolean isComplete() {
- return request.isComplete();
- }
- };
- }
-
- super.buildResource(opened);
- }
-
- @Override
protected Session createEndpoint(JmsSessionInfo resourceInfo) {
long outgoingWindow = getParent().getProvider().getSessionOutgoingWindow();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java
deleted file mode 100644
index 4f9466d..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionContextBuilder.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp.builders;
-
-import org.apache.qpid.jms.meta.JmsSessionInfo;
-import org.apache.qpid.jms.provider.amqp.AmqpSession;
-import org.apache.qpid.jms.provider.amqp.AmqpTransactionContext;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.transaction.Coordinator;
-import org.apache.qpid.proton.amqp.transaction.TxnCapability;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.engine.Sender;
-
-/**
- * Resource builder responsible for creating and opening an AmqpTemporaryDestination instance.
- */
-public class AmqpTransactionContextBuilder extends AmqpResourceBuilder<AmqpTransactionContext, AmqpSession, JmsSessionInfo, Sender> {
-
- public AmqpTransactionContextBuilder(AmqpSession parent, JmsSessionInfo resourceInfo) {
- super(parent, resourceInfo);
- }
-
- @Override
- protected Sender createEndpoint(JmsSessionInfo resourceInfo) {
- Coordinator coordinator = new Coordinator();
- coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
- Source source = new Source();
-
- String coordinatorName = "qpid-jms:coordinator:" + resourceInfo.getId().toString();
-
- Sender sender = getParent().getEndpoint().sender(coordinatorName);
- sender.setSource(source);
- sender.setTarget(coordinator);
- sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-
- return sender;
- }
-
- @Override
- protected AmqpTransactionContext createResource(AmqpSession parent, JmsSessionInfo resourceInfo, Sender endpoint) {
- return new AmqpTransactionContext(parent, resourceInfo, endpoint);
- }
-
- @Override
- protected boolean isClosePending() {
- // When no link terminus was created, the peer will now detach/close us otherwise
- // we need to validate the returned remote source prior to open completion.
- return getEndpoint().getRemoteTarget() == null;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionCoordinatorBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionCoordinatorBuilder.java
new file mode 100644
index 0000000..6a01491
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTransactionCoordinatorBuilder.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.builders;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.amqp.AmqpTransactionContext;
+import org.apache.qpid.jms.provider.amqp.AmqpTransactionCoordinator;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * Resource builder responsible for creating and opening an AmqpTransactionCoordinator instance.
+ */
+public class AmqpTransactionCoordinatorBuilder extends AmqpResourceBuilder<AmqpTransactionCoordinator, AmqpTransactionContext, JmsSessionInfo, Sender> {
+
+ public AmqpTransactionCoordinatorBuilder(AmqpTransactionContext parent, JmsSessionInfo resourceInfo) {
+ super(parent, resourceInfo);
+ }
+
+ @Override
+ protected Sender createEndpoint(JmsSessionInfo resourceInfo) {
+ Coordinator coordinator = new Coordinator();
+ coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
+ Source source = new Source();
+
+ String coordinatorName = "qpid-jms:coordinator:" + resourceInfo.getId().toString();
+
+ Sender sender = getParent().getSession().getEndpoint().sender(coordinatorName);
+ sender.setSource(source);
+ sender.setTarget(coordinator);
+ sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ return sender;
+ }
+
+ @Override
+ protected AmqpTransactionCoordinator createResource(AmqpTransactionContext parent, JmsSessionInfo resourceInfo, Sender endpoint) {
+ return new AmqpTransactionCoordinator(resourceInfo, endpoint, parent);
+ }
+
+ @Override
+ protected boolean isClosePending() {
+ // When no link terminus was created, the peer will now detach/close us otherwise
+ // we need to validate the returned remote source prior to open completion.
+ return getEndpoint().getRemoteTarget() == null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index f493a1c..a84d260 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -61,11 +61,7 @@ import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.jms.util.MetaDataSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
@@ -120,9 +116,7 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
assertNotNull("Session should not be null", session);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
index f2bf986..b13fa00 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/QueueBrowserIntegrationTest.java
@@ -40,7 +40,6 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
-import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.Binary;
@@ -262,8 +261,7 @@ public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 9fc563f..ef77481 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -69,7 +69,6 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
@@ -1211,8 +1210,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/974a8510/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index c844e1e..be73403 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -41,9 +41,6 @@ import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
import org.apache.qpid.jms.test.testpeer.describedtypes.Error;
import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
@@ -51,7 +48,6 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
@@ -59,17 +55,20 @@ import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests for behavior of Transacted Session operations.
*/
public class TransactionsIntegrationTest extends QpidJmsTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionsIntegrationTest.class);
+
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
@Test(timeout=20000)
@@ -79,24 +78,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
- // First expect an unsettled 'declare' transfer to the txn coordinator, and
- // reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
- // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
- // and reply with accepted and settled disposition to indicate the rollback succeeded.
- Discharge discharge = new Discharge();
- discharge.setFail(true);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+ // Closed session should roll-back the TX with a failed discharge
+ testPeer.expectDischarge(txnId, true);
testPeer.expectEnd();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
@@ -114,22 +102,19 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
- MessageProducer producer = session.createProducer(queue);
+ MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
@@ -152,19 +137,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with rejected and settled disposition to indicate the commit failed
- Discharge discharge = new Discharge();
- discharge.setFail(false);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Rejected(), true);
+ testPeer.expectDischarge(txnId, false, new Rejected());
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
try {
session.commit();
@@ -183,22 +161,19 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
- MessageProducer producer = session.createProducer(queue);
+ MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
@@ -221,20 +196,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with rejected and settled disposition to indicate the commit failed
- Discharge discharge = new Discharge();
- discharge.setFail(false);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error"));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, commitFailure, true);
+ testPeer.expectDischarge(txnId, false, commitFailure);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
try {
session.commit();
@@ -268,22 +236,19 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
- MessageProducer producer = session.createProducer(queue);
+ MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
@@ -306,20 +271,13 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with rejected and settled disposition to indicate the rollback failed
- Discharge discharge = new Discharge();
- discharge.setFail(true);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error"));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, commitFailure, true);
+ testPeer.expectDischarge(txnId, true, commitFailure);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
try {
session.rollback();
@@ -362,15 +320,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
@@ -398,19 +353,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the commit succeeded
- Discharge discharge = new Discharge();
- discharge.setFail(false);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+ testPeer.expectDischarge(txnId, false);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
session.commit();
@@ -425,22 +373,19 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
- MessageProducer producer = session.createProducer(queue);
+ MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
@@ -481,15 +426,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
@@ -520,19 +462,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
- Discharge discharge = new Discharge();
- discharge.setFail(true);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+ testPeer.expectDischarge(txnId, true);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
// Expect the messages that were not consumed to be released
int unconsumed = transferCount - consumeCount;
@@ -558,15 +493,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
@@ -579,7 +511,7 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
- MessageProducer producer = session.createProducer(queue);
+ MessageProducer producer = session.createProducer(queue);
// Expect the message which provoked creating the transaction
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
@@ -602,19 +534,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
- Discharge discharge = new Discharge();
- discharge.setFail(true);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+ testPeer.expectDischarge(txnId, true);
// Now expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
// Expect the messages that were not consumed to be released
for (int i = 1; i <= messageCount; i++) {
@@ -639,15 +564,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
@@ -660,7 +582,7 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
- MessageProducer producer = session.createProducer(queue);
+ MessageProducer producer = session.createProducer(queue);
// Expect the message which provoked creating the transaction
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
@@ -688,19 +610,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded
- Discharge discharge = new Discharge();
- discharge.setFail(true);
- discharge.setTxnId(txnId);
- TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
- dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
- testPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+ testPeer.expectDischarge(txnId, true);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
// Expect the messages that were not consumed to be released
for (int i = 1; i <= messageCount; i++) {
@@ -723,15 +638,12 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
connection.start();
testPeer.expectBegin();
- CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
- testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+ testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
- TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
- declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
- testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+ testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
String queueName = "myQueue";
@@ -752,4 +664,104 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
}
}
+
+ @Test(timeout=20000)
+ public void testRollbackErrorCoordinatorClosedOnCommit() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+ testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId, false);
+ testPeer.expectCoordinatorAttach();
+ testPeer.expectDeclare(txnId);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ try {
+ session.commit();
+ fail("Transaction should have rolled back");
+ } catch (TransactionRolledBackException ex) {
+ LOG.info("Caught expected TransactionRolledBackException");
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testJMSErrorCoordinatorClosedOnRollback() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+ testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId, true);
+ testPeer.expectCoordinatorAttach();
+ testPeer.expectDeclare(txnId);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ try {
+ session.rollback();
+ fail("Transaction should have rolled back");
+ } catch (JMSException ex) {
+ LOG.info("Caught expected JMSException");
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test // (timeout=20000)
+ public void testSendAfterCoordinatorLinkClosedDuringTX() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a Declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ // Create a producer to use in provoking creation of the AMQP transaction
+ testPeer.expectSenderAttach();
+
+ // Close the link, the messages should now just get dropped on the floor.
+ testPeer.remotelyCloseLastCoordinatorLink();
+
+ MessageProducer producer = session.createProducer(queue);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ producer.send(session.createMessage());
+
+ // Expect that a new link will be created in order to start the next TX.
+ txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ testPeer.expectCoordinatorAttach();
+ testPeer.expectDeclare(txnId);
+
+ try {
+ session.commit();
+ fail("Commit operation should have failed.");
+ } catch (TransactionRolledBackException jmsTxRb) {
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org