You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/23 18:00:52 UTC

qpid-jms git commit: QPIDJMS-125 Ensure that resource can close immediately after the transaction has completed.

Repository: qpid-jms
Updated Branches:
  refs/heads/master fe6b75c84 -> 3b1dd1f3a


QPIDJMS-125 Ensure that resource can close immediately after the
transaction has completed.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3b1dd1f3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3b1dd1f3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3b1dd1f3

Branch: refs/heads/master
Commit: 3b1dd1f3a7135533b8a16e9f75ca4a176a710994
Parents: fe6b75c
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 23 12:00:13 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 23 12:00:13 2015 -0400

----------------------------------------------------------------------
 .../qpid/jms/JmsLocalTransactionContext.java    | 46 +++++++++-----
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  8 +--
 .../qpid/jms/JmsNoTxTransactionContext.java     |  8 ++-
 .../apache/qpid/jms/JmsTransactionContext.java  | 20 +++++-
 .../qpid/jms/JmsTransactionSynchronization.java | 65 ++++++++++++++++++++
 .../apache/qpid/jms/JmsTxSynchronization.java   | 65 --------------------
 .../TransactionsIntegrationTest.java            | 31 +++++++---
 .../transactions/JmsTransactedConsumerTest.java | 34 ++++++++++
 8 files changed, 180 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index 6a86033..54a3625 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -17,7 +17,9 @@
 package org.apache.qpid.jms;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.jms.JMSException;
@@ -26,6 +28,7 @@ import javax.jms.TransactionRolledBackException;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsResourceId;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.meta.JmsTransactionInfo;
 import org.apache.qpid.jms.provider.Provider;
@@ -42,12 +45,12 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(JmsLocalTransactionContext.class);
 
-    private final List<JmsTxSynchronization> synchronizations = new ArrayList<JmsTxSynchronization>();
+    private final List<JmsTransactionSynchronization> synchronizations = new ArrayList<JmsTransactionSynchronization>();
+    private final Map<JmsResourceId, JmsResourceId> participants = new HashMap<JmsResourceId, JmsResourceId>();
     private final JmsSession session;
     private final JmsConnection connection;
     private volatile JmsTransactionId transactionId;
     private volatile boolean failed;
-    private volatile boolean hasWork;
     private JmsTransactionListener listener;
 
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -58,7 +61,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     }
 
     @Override
-    public void send(JmsConnection connection, JmsOutboundMessageDispatch envelope) throws JMSException {
+    public void send(JmsConnection connection, final JmsOutboundMessageDispatch envelope) throws JMSException {
         lock.readLock().lock();
         try {
             if (isFailed()) {
@@ -71,13 +74,13 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
                 @Override
                 public void onPendingSuccess() {
                     LOG.trace("TX:{} has performed a send.", getTransactionId());
-                    hasWork = true;
+                    participants.put(envelope.getProducerId(), envelope.getProducerId());
                 }
 
                 @Override
                 public void onPendingFailure(Throwable cause) {
                     LOG.trace("TX:{} has a failed send.", getTransactionId());
-                    hasWork = true;
+                    participants.put(envelope.getProducerId(), envelope.getProducerId());
                 }
             });
         } finally {
@@ -86,7 +89,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     }
 
     @Override
-    public void acknowledge(JmsConnection connection, JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
+    public void acknowledge(JmsConnection connection, final JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
         // Consumed or delivered messages fall into a transaction otherwise just pass it in.
         if (ackType == ACK_TYPE.ACCEPTED || ackType == ACK_TYPE.DELIVERED) {
             lock.readLock().lock();
@@ -96,13 +99,13 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
                     @Override
                     public void onPendingSuccess() {
                         LOG.trace("TX:{} has performed a acknowledge.", getTransactionId());
-                        hasWork = true;
+                        participants.put(envelope.getConsumerId(), envelope.getConsumerId());
                     }
 
                     @Override
                     public void onPendingFailure(Throwable cause) {
                         LOG.trace("TX:{} has failed a acknowledge.", getTransactionId());
-                        hasWork = true;
+                        participants.put(envelope.getConsumerId(), envelope.getConsumerId());
                     }
                 });
             } finally {
@@ -114,8 +117,8 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     }
 
     @Override
-    public void addSynchronization(JmsTxSynchronization sync) throws JMSException {
-        lock.readLock().lock();
+    public void addSynchronization(JmsTransactionSynchronization sync) throws JMSException {
+        lock.writeLock().lock();
         try {
             if (sync.validate(this)) {
                 synchronizations.add(sync);
@@ -123,7 +126,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
         } catch (Exception e) {
             throw JmsExceptionSupport.create(e);
         } finally {
-            lock.readLock().unlock();
+            lock.writeLock().unlock();
         }
     }
 
@@ -269,7 +272,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     public void onConnectionInterrupted() {
         lock.writeLock().tryLock();
         try {
-            failed = hasWork;
+            failed = !participants.isEmpty();
         } finally {
             if (lock.writeLock().isHeldByCurrentThread()) {
                 lock.writeLock().unlock();
@@ -294,7 +297,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
 
             // It is ok to use the newly created TX from here if the TX never had any
             // work done within it otherwise we want the next commit to fail.
-            failed = hasWork;
+            failed = !participants.isEmpty();
         } finally {
             if (lock.writeLock().isHeldByCurrentThread()) {
                 lock.writeLock().unlock();
@@ -334,6 +337,16 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
         }
     }
 
+    @Override
+    public boolean isActiveInThisContext(JmsResourceId resouceId) {
+        lock.readLock().lock();
+        try {
+            return participants.containsKey(resouceId);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     //------------- Implementation methods -----------------------------------//
 
     /*
@@ -343,7 +356,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     private void reset() {
         transactionId = null;
         failed = false;
-        hasWork = false;
+        participants.clear();
     }
 
     /*
@@ -356,7 +369,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
         }
 
         Throwable firstException = null;
-        for (JmsTxSynchronization sync : synchronizations) {
+        for (JmsTransactionSynchronization sync : synchronizations) {
             try {
                 sync.afterRollback();
             } catch (Throwable thrown) {
@@ -366,6 +379,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
                 }
             }
         }
+
         synchronizations.clear();
         if (firstException != null) {
             throw JmsExceptionSupport.create(firstException);
@@ -382,7 +396,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
         }
 
         Throwable firstException = null;
-        for (JmsTxSynchronization sync : synchronizations) {
+        for (JmsTransactionSynchronization sync : synchronizations) {
             try {
                 sync.afterCommit();
             } catch (Throwable thrown) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 06f036c..3e182f8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -61,7 +61,6 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
     protected final MessageQueue messageQueue;
     protected final Lock lock = new ReentrantLock();
     protected final AtomicBoolean suspendedConnection = new AtomicBoolean();
-    protected final AtomicBoolean delivered = new AtomicBoolean();
     protected final AtomicReference<Exception> failureCause = new AtomicReference<>();
 
     protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
@@ -120,11 +119,11 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
     @Override
     public void close() throws JMSException {
         if (!closed.get()) {
-            session.getTransactionContext().addSynchronization(new JmsTxSynchronization() {
+            session.getTransactionContext().addSynchronization(new JmsTransactionSynchronization() {
 
                 @Override
                 public boolean validate(JmsTransactionContext context) throws Exception {
-                    if (!context.isInTransaction() || !delivered.get() || isBrowser()) {
+                    if (isBrowser() || !context.isActiveInThisContext(getConsumerId())) {
                         doClose();
                         return false;
                     }
@@ -358,9 +357,6 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
             } else {
                 doAckConsumed(envelope);
             }
-
-            // Tags that we have delivered and can't close if in a TX Session.
-            delivered.set(true);
         }
         return envelope;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
index 23b6168..214ec7c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
@@ -21,6 +21,7 @@ import javax.jms.JMSException;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsResourceId;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
@@ -42,7 +43,7 @@ public class JmsNoTxTransactionContext implements JmsTransactionContext {
     }
 
     @Override
-    public void addSynchronization(JmsTxSynchronization sync) throws JMSException {
+    public void addSynchronization(JmsTransactionSynchronization sync) throws JMSException {
         try {
             sync.validate(this);
         } catch (Exception e) {
@@ -91,6 +92,11 @@ public class JmsNoTxTransactionContext implements JmsTransactionContext {
     }
 
     @Override
+    public boolean isActiveInThisContext(JmsResourceId resouceId) {
+        return false;
+    }
+
+    @Override
     public void onConnectionInterrupted() {
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
index 008bb9c..bfc2c8e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
@@ -20,6 +20,7 @@ import javax.jms.JMSException;
 
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsResourceId;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
@@ -70,7 +71,7 @@ public interface JmsTransactionContext {
      *
      * @throws JMSException if an error occurs during the send.
      */
-    void addSynchronization(JmsTxSynchronization sync) throws JMSException;
+    void addSynchronization(JmsTransactionSynchronization sync) throws JMSException;
 
     /**
      * @return if the currently transaction has been marked as being failed.
@@ -140,6 +141,23 @@ public interface JmsTransactionContext {
     boolean isInTransaction();
 
     /**
+     * Allows a resource to query the transaction context to determine if it has pending
+     * work in the current transaction.
+     *
+     * Callers should use caution with this method as it is only a view into the current
+     * state without blocking ongoing transaction operations.  The best use of this method is
+     * in the validation method of a JmsTransactionSynchronization to determine if the
+     * synchronization needs to be added based on whether to requesting resource has any
+     * pending operations.
+     *
+     * @param resouceId
+     *      The JmsResourceId of the resource making this query.
+     *
+     * @return true if the resource has pending work in the current transaction.
+     */
+    boolean isActiveInThisContext(JmsResourceId resouceId);
+
+    /**
      * Signals that the connection that was previously established has been lost and the
      * listener should alter its state to reflect the fact that there is no active connection.
      */

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java
new file mode 100644
index 0000000..12aa719
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Interface for JmsResources that are part of a running transaction to use
+ * to register for notifications of transaction commit and rollback in order
+ * to execute specific actions.
+ *
+ * One such use of this might be for a consumer to register a synchronization
+ * when it is closed while it's parent session is still operating inside a
+ * transaction.  The Consumer can close itself following the commit or rollback
+ * of the running Transaction.
+ */
+public abstract class JmsTransactionSynchronization {
+
+    /**
+     * Called once before the synchronization is added to the set
+     * of synchronizations held for a pending TX.  The caller can
+     * check TX state and react accordingly.  If the resource finds
+     * that is does not need to be added to the TX it can return false
+     * to indicate such.
+     *
+     * @param context
+     *        reference to the transaction context.
+     *
+     * @return true if the synchronization should be added to the TX.
+     *
+     * @throws Exception if an error occurs during the event.
+     */
+    public boolean validate(JmsTransactionContext context) throws Exception {
+        return true;
+    }
+
+    /**
+     * Called after a successful commit of the current Transaction.
+     *
+     * @throws Exception if an error occurs during the event.
+     */
+    public void afterCommit() throws Exception {
+    }
+
+    /**
+     * Called after the current transaction has been rolled back either
+     * by a call to rollback or by a failure to complete a commit operation.
+     *
+     * @throws Exception if an error occurs during the event.
+     */
+    public void afterRollback() throws Exception {
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
deleted file mode 100644
index f9b65d9..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms;
-
-/**
- * Interface for JmsResources that are part of a running transaction to use
- * to register for notifications of transaction commit and rollback in order
- * to execute specific actions.
- *
- * One such use of this might be for a consumer to register a synchronization
- * when it is closed while it's parent session is still operating inside a
- * transaction.  The Consumer can close itself following the commit or rollback
- * of the running Transaction.
- */
-public abstract class JmsTxSynchronization {
-
-    /**
-     * Called once before the synchronization is added to the set
-     * of synchronizations held for a pending TX.  The caller can
-     * check TX state and react accordingly.  If the resource finds
-     * that is does not need to be added to the TX it can return false
-     * to indicate such.
-     *
-     * @param context
-     *        reference to the transaction context.
-     *
-     * @return true if the synchronization should be added to the TX.
-     *
-     * @throws Exception if an error occurs during the event.
-     */
-    public boolean validate(JmsTransactionContext context) throws Exception {
-        return true;
-    }
-
-    /**
-     * Called after a successful commit of the current Transaction.
-     *
-     * @throws Exception if an error occurs during the event.
-     */
-    public void afterCommit() throws Exception {
-    }
-
-    /**
-     * Called after the current transaction has been rolled back either
-     * by a call to rollback or by a failure to complete a commit operation.
-     *
-     * @throws Exception if an error occurs during the event.
-     */
-    public void afterRollback() throws Exception {
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index e2d62b6..a43be96 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -309,25 +309,35 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
     @Test(timeout=20000)
     public void testCommitTransactedSessionWithConsumerReceivingAllMessages() throws Exception {
-        doCommitTransactedSessionWithConsumerTestImpl(1, 1, false);
+        doCommitTransactedSessionWithConsumerTestImpl(1, 1, false, false);
     }
 
     @Test(timeout=20000)
-    public void testCommitTransactedSessionWithConsumerReceivingAllMessagesAndClose() throws Exception {
-        doCommitTransactedSessionWithConsumerTestImpl(1, 1, true);
+    public void testCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseBefore() throws Exception {
+        doCommitTransactedSessionWithConsumerTestImpl(1, 1, true, true);
+    }
+
+    @Test(timeout=20000)
+    public void testCommitTransactedSessionWithConsumerReceivingAllMessagesAndCloseAfter() throws Exception {
+        doCommitTransactedSessionWithConsumerTestImpl(1, 1, true, false);
     }
 
     @Test(timeout=20000)
     public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() throws Exception {
-        doCommitTransactedSessionWithConsumerTestImpl(5, 2, false);
+        doCommitTransactedSessionWithConsumerTestImpl(5, 2, false, false);
+    }
+
+    @Test(timeout=20000)
+    public void testCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesBefore() throws Exception {
+        doCommitTransactedSessionWithConsumerTestImpl(5, 2, true, true);
     }
 
     @Test(timeout=20000)
-    public void testCommitTransactedSessionWithConsumerReceivingSomeMessagesAndCloses() throws Exception {
-        doCommitTransactedSessionWithConsumerTestImpl(5, 2, true);
+    public void testCommitTransactedSessionWithConsumerReceivingSomeMessagesAndClosesAfter() throws Exception {
+        doCommitTransactedSessionWithConsumerTestImpl(5, 2, true, false);
     }
 
-    private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, boolean closeConsumer) throws Exception {
+    private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, boolean closeConsumer, boolean closeBeforeCommit) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);
             connection.start();
@@ -369,7 +379,7 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.expectDischarge(txnId, false);
 
             // Expect the consumer to close now
-            if (closeConsumer) {
+            if (closeConsumer && closeBeforeCommit) {
                 testPeer.expectDetach(true, true, true);
 
                 // Expect the messages that were not consumed to be released
@@ -388,6 +398,11 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
             session.commit();
 
+            if (closeConsumer && !closeBeforeCommit) {
+                testPeer.expectDetach(true, true, true);
+                messageConsumer.close();
+            }
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3b1dd1f3/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
index 461bedb..c3d4a98 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
@@ -490,4 +490,38 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport {
 
         session.close();
     }
+
+    @Test(timeout = 60000)
+    public void testConsumerClosesAfterItsTXCommits() throws Exception {
+        connection = createAmqpConnection();
+        connection.start();
+
+        Session mgmtSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = mgmtSession.createQueue(name.getMethodName());
+
+        // Send a message that will be rolled back.
+        Session senderSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = senderSession.createProducer(queue);
+        producer.send(senderSession.createMessage());
+        senderSession.commit();
+        senderSession.close();
+
+        // Consumer the message in a transaction and then roll it back
+        Session txSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = txSession.createConsumer(queue);
+        Message received = consumer.receive(1000);
+        assertNotNull("Consumer didn't receive the message", received);
+        txSession.rollback();
+        consumer.close();
+
+        // Create an auto acknowledge session and consumer normally.
+        Session nonTxSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = nonTxSession.createConsumer(queue);
+        received = consumer.receive(1000);
+        assertNotNull("receiver3 didn't received the message", received);
+        consumer.close();
+
+        connection.close();
+    }
 }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org