You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/23 18:00:52 UTC
qpid-jms git commit: QPIDJMS-125 Ensure that resource can close
immediately after the transaction has completed.
Repository: qpid-jms
Updated Branches:
refs/heads/master fe6b75c84 -> 3b1dd1f3a
QPIDJMS-125 Ensure that resource can close immediately after the
transaction has completed.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3b1dd1f3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3b1dd1f3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3b1dd1f3
Branch: refs/heads/master
Commit: 3b1dd1f3a7135533b8a16e9f75ca4a176a710994
Parents: fe6b75c
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 23 12:00:13 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 23 12:00:13 2015 -0400
----------------------------------------------------------------------
.../qpid/jms/JmsLocalTransactionContext.java | 46 +++++++++-----
.../org/apache/qpid/jms/JmsMessageConsumer.java | 8 +--
.../qpid/jms/JmsNoTxTransactionContext.java | 8 ++-
.../apache/qpid/jms/JmsTransactionContext.java | 20 +++++-
.../qpid/jms/JmsTransactionSynchronization.java | 65 ++++++++++++++++++++
.../apache/qpid/jms/JmsTxSynchronization.java | 65 --------------------
.../TransactionsIntegrationTest.java | 31 +++++++---
.../transactions/JmsTransactedConsumerTest.java | 34 ++++++++++
8 files changed, 180 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index 6a86033..54a3625 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -17,7 +17,9 @@
package org.apache.qpid.jms;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.JMSException;
@@ -26,6 +28,7 @@ import javax.jms.TransactionRolledBackException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsResourceId;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.meta.JmsTransactionInfo;
import org.apache.qpid.jms.provider.Provider;
@@ -42,12 +45,12 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(JmsLocalTransactionContext.class);
- private final List<JmsTxSynchronization> synchronizations = new ArrayList<JmsTxSynchronization>();
+ private final List<JmsTransactionSynchronization> synchronizations = new ArrayList<JmsTransactionSynchronization>();
+ private final Map<JmsResourceId, JmsResourceId> participants = new HashMap<JmsResourceId, JmsResourceId>();
private final JmsSession session;
private final JmsConnection connection;
private volatile JmsTransactionId transactionId;
private volatile boolean failed;
- private volatile boolean hasWork;
private JmsTransactionListener listener;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -58,7 +61,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
}
@Override
- public void send(JmsConnection connection, JmsOutboundMessageDispatch envelope) throws JMSException {
+ public void send(JmsConnection connection, final JmsOutboundMessageDispatch envelope) throws JMSException {
lock.readLock().lock();
try {
if (isFailed()) {
@@ -71,13 +74,13 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
@Override
public void onPendingSuccess() {
LOG.trace("TX:{} has performed a send.", getTransactionId());
- hasWork = true;
+ participants.put(envelope.getProducerId(), envelope.getProducerId());
}
@Override
public void onPendingFailure(Throwable cause) {
LOG.trace("TX:{} has a failed send.", getTransactionId());
- hasWork = true;
+ participants.put(envelope.getProducerId(), envelope.getProducerId());
}
});
} finally {
@@ -86,7 +89,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
}
@Override
- public void acknowledge(JmsConnection connection, JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
+ public void acknowledge(JmsConnection connection, final JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
// Consumed or delivered messages fall into a transaction otherwise just pass it in.
if (ackType == ACK_TYPE.ACCEPTED || ackType == ACK_TYPE.DELIVERED) {
lock.readLock().lock();
@@ -96,13 +99,13 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
@Override
public void onPendingSuccess() {
LOG.trace("TX:{} has performed a acknowledge.", getTransactionId());
- hasWork = true;
+ participants.put(envelope.getConsumerId(), envelope.getConsumerId());
}
@Override
public void onPendingFailure(Throwable cause) {
LOG.trace("TX:{} has failed a acknowledge.", getTransactionId());
- hasWork = true;
+ participants.put(envelope.getConsumerId(), envelope.getConsumerId());
}
});
} finally {
@@ -114,8 +117,8 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
}
@Override
- public void addSynchronization(JmsTxSynchronization sync) throws JMSException {
- lock.readLock().lock();
+ public void addSynchronization(JmsTransactionSynchronization sync) throws JMSException {
+ lock.writeLock().lock();
try {
if (sync.validate(this)) {
synchronizations.add(sync);
@@ -123,7 +126,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
} catch (Exception e) {
throw JmsExceptionSupport.create(e);
} finally {
- lock.readLock().unlock();
+ lock.writeLock().unlock();
}
}
@@ -269,7 +272,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
public void onConnectionInterrupted() {
lock.writeLock().tryLock();
try {
- failed = hasWork;
+ failed = !participants.isEmpty();
} finally {
if (lock.writeLock().isHeldByCurrentThread()) {
lock.writeLock().unlock();
@@ -294,7 +297,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
// It is ok to use the newly created TX from here if the TX never had any
// work done within it otherwise we want the next commit to fail.
- failed = hasWork;
+ failed = !participants.isEmpty();
} finally {
if (lock.writeLock().isHeldByCurrentThread()) {
lock.writeLock().unlock();
@@ -334,6 +337,16 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
}
}
+ @Override
+ public boolean isActiveInThisContext(JmsResourceId resouceId) {
+ lock.readLock().lock();
+ try {
+ return participants.containsKey(resouceId);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
//------------- Implementation methods -----------------------------------//
/*
@@ -343,7 +356,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
private void reset() {
transactionId = null;
failed = false;
- hasWork = false;
+ participants.clear();
}
/*
@@ -356,7 +369,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
}
Throwable firstException = null;
- for (JmsTxSynchronization sync : synchronizations) {
+ for (JmsTransactionSynchronization sync : synchronizations) {
try {
sync.afterRollback();
} catch (Throwable thrown) {
@@ -366,6 +379,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
}
}
}
+
synchronizations.clear();
if (firstException != null) {
throw JmsExceptionSupport.create(firstException);
@@ -382,7 +396,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
}
Throwable firstException = null;
- for (JmsTxSynchronization sync : synchronizations) {
+ for (JmsTransactionSynchronization sync : synchronizations) {
try {
sync.afterCommit();
} catch (Throwable thrown) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 06f036c..3e182f8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -61,7 +61,6 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
protected final MessageQueue messageQueue;
protected final Lock lock = new ReentrantLock();
protected final AtomicBoolean suspendedConnection = new AtomicBoolean();
- protected final AtomicBoolean delivered = new AtomicBoolean();
protected final AtomicReference<Exception> failureCause = new AtomicReference<>();
protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
@@ -120,11 +119,11 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
@Override
public void close() throws JMSException {
if (!closed.get()) {
- session.getTransactionContext().addSynchronization(new JmsTxSynchronization() {
+ session.getTransactionContext().addSynchronization(new JmsTransactionSynchronization() {
@Override
public boolean validate(JmsTransactionContext context) throws Exception {
- if (!context.isInTransaction() || !delivered.get() || isBrowser()) {
+ if (isBrowser() || !context.isActiveInThisContext(getConsumerId())) {
doClose();
return false;
}
@@ -358,9 +357,6 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
} else {
doAckConsumed(envelope);
}
-
- // Tags that we have delivered and can't close if in a TX Session.
- delivered.set(true);
}
return envelope;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
index 23b6168..214ec7c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
@@ -21,6 +21,7 @@ import javax.jms.JMSException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsResourceId;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
@@ -42,7 +43,7 @@ public class JmsNoTxTransactionContext implements JmsTransactionContext {
}
@Override
- public void addSynchronization(JmsTxSynchronization sync) throws JMSException {
+ public void addSynchronization(JmsTransactionSynchronization sync) throws JMSException {
try {
sync.validate(this);
} catch (Exception e) {
@@ -91,6 +92,11 @@ public class JmsNoTxTransactionContext implements JmsTransactionContext {
}
@Override
+ public boolean isActiveInThisContext(JmsResourceId resouceId) {
+ return false;
+ }
+
+ @Override
public void onConnectionInterrupted() {
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
index 008bb9c..bfc2c8e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
@@ -20,6 +20,7 @@ import javax.jms.JMSException;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsResourceId;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
@@ -70,7 +71,7 @@ public interface JmsTransactionContext {
*
* @throws JMSException if an error occurs during the send.
*/
- void addSynchronization(JmsTxSynchronization sync) throws JMSException;
+ void addSynchronization(JmsTransactionSynchronization sync) throws JMSException;
/**
* @return if the currently transaction has been marked as being failed.
@@ -140,6 +141,23 @@ public interface JmsTransactionContext {
boolean isInTransaction();
/**
+ * Allows a resource to query the transaction context to determine if it has pending
+ * work in the current transaction.
+ *
+ * Callers should use caution with this method as it is only a view into the current
+ * state without blocking ongoing transaction operations. The best use of this method is
+ * in the validation method of a JmsTransactionSynchronization to determine if the
+ * synchronization needs to be added based on whether to requesting resource has any
+ * pending operations.
+ *
+ * @param resouceId
+ * The JmsResourceId of the resource making this query.
+ *
+ * @return true if the resource has pending work in the current transaction.
+ */
+ boolean isActiveInThisContext(JmsResourceId resouceId);
+
+ /**
* Signals that the connection that was previously established has been lost and the
* listener should alter its state to reflect the fact that there is no active connection.
*/
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java
new file mode 100644
index 0000000..12aa719
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Interface for JmsResources that are part of a running transaction to use
+ * to register for notifications of transaction commit and rollback in order
+ * to execute specific actions.
+ *
+ * One such use of this might be for a consumer to register a synchronization
+ * when it is closed while it's parent session is still operating inside a
+ * transaction. The Consumer can close itself following the commit or rollback
+ * of the running Transaction.
+ */
+public abstract class JmsTransactionSynchronization {
+
+ /**
+ * Called once before the synchronization is added to the set
+ * of synchronizations held for a pending TX. The caller can
+ * check TX state and react accordingly. If the resource finds
+ * that is does not need to be added to the TX it can return false
+ * to indicate such.
+ *
+ * @param context
+ * reference to the transaction context.
+ *
+ * @return true if the synchronization should be added to the TX.
+ *
+ * @throws Exception if an error occurs during the event.
+ */
+ public boolean validate(JmsTransactionContext context) throws Exception {
+ return true;
+ }
+
+ /**
+ * Called after a successful commit of the current Transaction.
+ *
+ * @throws Exception if an error occurs during the event.
+ */
+ public void afterCommit() throws Exception {
+ }
+
+ /**
+ * Called after the current transaction has been rolled back either
+ * by a call to rollback or by a failure to complete a commit operation.
+ *
+ * @throws Exception if an error occurs during the event.
+ */
+ public void afterRollback() throws Exception {
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
deleted file mode 100644
index f9b65d9..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms;
-
-/**
- * Interface for JmsResources that are part of a running transaction to use
- * to register for notifications of transaction commit and rollback in order
- * to execute specific actions.
- *
- * One such use of this might be for a consumer to register a synchronization
- * when it is closed while it's parent session is still operating inside a
- * transaction. The Consumer can close itself following the commit or rollback
- * of the running Transaction.
- */
-public abstract class JmsTxSynchronization {
-
- /**
- * Called once before the synchronization is added to the set
- * of synchronizations held for a pending TX. The caller can
- * check TX state and react accordingly. If the resource finds
- * that is does not need to be added to the TX it can return false
- * to indicate such.
- *
- * @param context
- * reference to the transaction context.
- *
- * @return true if the synchronization should be added to the TX.
- *
- * @throws Exception if an error occurs during the event.
- */
- public boolean validate(JmsTransactionContext context) throws Exception {
- return true;
- }
-
- /**
- * Called after a successful commit of the current Transaction.
- *
- * @throws Exception if an error occurs during the event.
- */
- public void afterCommit() throws Exception {
- }
-
- /**
- * Called after the current transaction has been rolled back either
- * by a call to rollback or by a failure to complete a commit operation.
- *
- * @throws Exception if an error occurs during the event.
- */
- public void afterRollback() throws Exception {
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index e2d62b6..a43be96 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -309,25 +309,35 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
@Test(timeout=20000)
public void testCommitTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
- doCommitTransactedSessionWithConsumerTestImpl(1, 1, false);
+ doCommitTransactedSessionWithConsumerTestImpl(1, 1, false, false);
}
@Test(timeout=20000)
- public void testCommitTransactedSessionWithConsumerReceivingAllMessagesAndClose() throws Exception {
- doCommitTransactedSessionWithConsumerTestImpl(1, 1, true);
+ public void testCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseBefore() throws Exception {
+ doCommitTransactedSessionWithConsumerTestImpl(1, 1, true, true);
+ }
+
+ @Test(timeout=20000)
+ public void testCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseAfter() throws Exception {
+ doCommitTransactedSessionWithConsumerTestImpl(1, 1, true, false);
}
@Test(timeout=20000)
public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() throws Exception {
- doCommitTransactedSessionWithConsumerTestImpl(5, 2, false);
+ doCommitTransactedSessionWithConsumerTestImpl(5, 2, false, false);
+ }
+
+ @Test(timeout=20000)
+ public void testCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesBefore() throws Exception {
+ doCommitTransactedSessionWithConsumerTestImpl(5, 2, true, true);
}
@Test(timeout=20000)
- public void testCommitTransactedSessionWithConsumerReceivingSomeMessagesAndCloses() throws Exception {
- doCommitTransactedSessionWithConsumerTestImpl(5, 2, true);
+ public void testCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesAfter() throws Exception {
+ doCommitTransactedSessionWithConsumerTestImpl(5, 2, true, false);
}
- private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, boolean closeConsumer) throws Exception {
+ private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, boolean closeConsumer, boolean closeBeforeCommit) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
@@ -369,7 +379,7 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.expectDischarge(txnId, false);
// Expect the consumer to close now
- if (closeConsumer) {
+ if (closeConsumer && closeBeforeCommit) {
testPeer.expectDetach(true, true, true);
// Expect the messages that were not consumed to be released
@@ -388,6 +398,11 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
session.commit();
+ if (closeConsumer && !closeBeforeCommit) {
+ testPeer.expectDetach(true, true, true);
+ messageConsumer.close();
+ }
+
testPeer.waitForAllHandlersToComplete(1000);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
index 461bedb..c3d4a98 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
@@ -490,4 +490,38 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport {
session.close();
}
+
+ @Test(timeout = 60000)
+ public void testConsumerClosesAfterItsTXCommits() throws Exception {
+ connection = createAmqpConnection();
+ connection.start();
+
+ Session mgmtSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = mgmtSession.createQueue(name.getMethodName());
+
+ // Send a message that will be rolled back.
+ Session senderSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = senderSession.createProducer(queue);
+ producer.send(senderSession.createMessage());
+ senderSession.commit();
+ senderSession.close();
+
+ // Consumer the message in a transaction and then roll it back
+ Session txSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = txSession.createConsumer(queue);
+ Message received = consumer.receive(1000);
+ assertNotNull("Consumer didn't receive the message", received);
+ txSession.rollback();
+ consumer.close();
+
+ // Create an auto acknowledge session and consumer normally.
+ Session nonTxSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = nonTxSession.createConsumer(queue);
+ received = consumer.receive(1000);
+ assertNotNull("receiver3 didn't received the message", received);
+ consumer.close();
+
+ connection.close();
+ }
}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org