You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/25 16:19:34 UTC

qpid-jms git commit: NO-JIRA Add test coverage for scenarios not currently covered in the client integration tests.

Repository: qpid-jms
Updated Branches:
  refs/heads/master b646ca0f9 -> 53d29e85d


NO-JIRA Add test coverage for scenarios not currently covered in the
client integration tests.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/53d29e85
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/53d29e85
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/53d29e85

Branch: refs/heads/master
Commit: 53d29e85d249dab5b0ec6636dc0fa262a7a67891
Parents: b646ca0
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 24 18:33:07 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed May 25 12:19:01 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    |   5 +-
 .../jms/integration/SessionIntegrationTest.java | 327 ++++++++++++++++++-
 2 files changed, 324 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/53d29e85/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 7b3d20f..ff28e59 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -628,7 +628,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException {
         JmsDestination destination = JmsMessageTransformation.transformDestination(connection, dest);
 
-        if(destination.isTemporary() && ((JmsTemporaryDestination) destination).isDeleted()) {
+        if (destination.isTemporary() && ((JmsTemporaryDestination) destination).isDeleted()) {
             throw new IllegalStateException("Temporary destination has been deleted");
         }
 
@@ -730,7 +730,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
      * method is usually only called when the Session is in the CLIENT_ACKNOWLEDGE mode.
      *
      * @param ackType
-     *        The type of acknowledgement being done.
+     *      The type of acknowledgement being done.
+     *
      * @throws JMSException if an error occurs while the acknowledge is processed.
      */
     void acknowledge(ACK_TYPE ackType) throws JMSException {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/53d29e85/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index bca3a57..c2d3dd6 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -69,8 +69,6 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
-import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
@@ -82,7 +80,6 @@ import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.jms.util.QpidJMSTestRunner;
 import org.apache.qpid.jms.util.Repeat;
 import org.apache.qpid.proton.amqp.Binary;
@@ -108,10 +105,16 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             assertNotNull("Session should not be null", session);
             testPeer.expectEnd();
+            testPeer.expectClose();
+
             session.close();
 
             // Should send nothing and throw no error.
             session.close();
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
         }
     }
 
@@ -150,9 +153,14 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             testPeer.expectSenderAttach();
+            testPeer.expectClose();
 
             Queue queue = session.createQueue("myQueue");
             session.createProducer(queue);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
         }
     }
 
@@ -176,9 +184,14 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             targetMatcher.withAddress(equalTo(queueName));
 
             testPeer.expectSenderAttach(sourceMatcher, targetMatcher, false, false);
+            testPeer.expectClose();
 
             Queue queue = session.createQueue(queueName);
             session.createProducer(queue);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
         }
     }
 
@@ -194,10 +207,69 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
 
             testPeer.expectReceiverAttach();
             testPeer.expectLinkFlow();
+            testPeer.expectClose();
 
             Queue queue = session.createQueue("myQueue");
             session.createConsumer(queue);
 
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateConsumerWithEmptySelector() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.expectClose();
+
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue, "");
+            assertNull(consumer.getMessageSelector());
+            consumer = session.createConsumer(queue, "", false);
+            assertNull(consumer.getMessageSelector());
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateConsumerWithNullSelector() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.expectClose();
+
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue, null);
+            assertNull(consumer.getMessageSelector());
+            consumer = session.createConsumer(queue, null, false);
+            assertNull(consumer.getMessageSelector());
+
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
@@ -232,6 +304,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.expectReceiverAttach(notNullValue(), targetMatcher, true, deferAttachResponseWrite);
             //Expect the detach response to the test peer closing the consumer link after refusal.
             testPeer.expectDetach(true, false, false);
+            testPeer.expectClose();
 
             try {
                 //Create a consumer, expect it to throw exception due to the link-refusal
@@ -241,6 +314,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 //Expected
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -268,6 +343,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
 
             testPeer.expectReceiverAttach(notNullValue(), targetMatcher, false, true, true, false, null, null);
             testPeer.expectDetach(true, false, false);
+            testPeer.expectClose();
 
             try {
                 // Create a consumer, expect it to throw exception due to the link-refusal
@@ -279,6 +355,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 LOG.info("Caught expected error on consumer create: {}", ex.getMessage());
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -300,6 +378,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
 
             testPeer.expectReceiverAttach(notNullValue(), notNullValue(), true, true, true, false, null, null);
             testPeer.expectDetach(true, false, false);
+            testPeer.expectClose();
 
             try {
                 // Create a QueueBrowser, expect it to throw exception due to the link-refusal
@@ -312,6 +391,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 LOG.info("Caught expected error on browser create: {}", ex.getMessage());
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -363,6 +444,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             } catch (JMSSecurityException jmsse) {
             }
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -384,6 +468,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             assertNotNull("TemporaryQueue queue name was null", tempQueue.getQueueName());
             assertEquals("TemporaryQueue name not as expected", dynamicAddress, tempQueue.getQueueName());
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -408,6 +495,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 LOG.info("Caught expected exception: {}", jmsEx.getMessage());
             }
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -429,6 +519,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.expectDetach(true, true, true);
             tempQueue.delete();
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -458,6 +551,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 LOG.info("Caught expected exception: {}", jmsEx.getMessage());
             }
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -479,6 +575,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             assertNotNull("TemporaryTopic name was null", tempTopic.getTopicName());
             assertEquals("TemporaryTopic name not as expected", dynamicAddress, tempTopic.getTopicName());
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -503,6 +602,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 LOG.info("Caught expected exception: {}", jmsEx.getMessage());
             }
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -524,6 +626,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.expectDetach(true, true, true);
             tempTopic.delete();
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -553,6 +658,153 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 LOG.info("Caught expected exception: {}", jmsEx.getMessage());
             }
 
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testSendToDeletedTemporaryTopicFails() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String dynamicAddress = "myTempTopicAddress";
+            testPeer.expectTempTopicCreationAttach(dynamicAddress);
+            TemporaryTopic tempTopic = session.createTemporaryTopic();
+
+            testPeer.expectSenderAttach();
+
+            MessageProducer producer = session.createProducer(tempTopic);
+
+            // Deleting the TemporaryTopic will be achieved by closing its creating link.
+            testPeer.expectDetach(true, true, true);
+            tempTopic.delete();
+
+            try {
+                producer.send(session.createMessage());
+                fail("Should detect that the destination was deleted and fail");
+            } catch (JMSException ignored) {
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testSendToDeletedTemporaryQueueFails() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String dynamicAddress = "myTempQueueAddress";
+            testPeer.expectTempQueueCreationAttach(dynamicAddress);
+            TemporaryQueue tempQueue = session.createTemporaryQueue();
+
+            testPeer.expectSenderAttach();
+
+            MessageProducer producer = session.createProducer(tempQueue);
+
+            // Deleting the TemporaryTopic will be achieved by closing its creating link.
+            testPeer.expectDetach(true, true, true);
+            tempQueue.delete();
+
+            try {
+                producer.send(session.createMessage());
+                fail("Should detect that the destination was deleted and fail");
+            } catch (JMSException ignored) {
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCannotDeleteTemporaryQueueInUse() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String dynamicAddress = "myTempQueueAddress";
+            testPeer.expectTempQueueCreationAttach(dynamicAddress);
+            TemporaryQueue tempQueue = session.createTemporaryQueue();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+
+            MessageConsumer consumer = session.createConsumer(tempQueue);
+
+            try {
+                tempQueue.delete();
+                fail("Should not be able to delete an in use temp destination");
+            } catch (JMSException ex) {
+            }
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            // Deleting the TemporaryQueue will be achieved by closing its creating link.
+            testPeer.expectDetach(true, true, true);
+            tempQueue.delete();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCannotDeleteTemporaryTopicInUse() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String dynamicAddress = "myTempTopicAddress";
+            testPeer.expectTempTopicCreationAttach(dynamicAddress);
+            TemporaryTopic tempTopic = session.createTemporaryTopic();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+
+            MessageConsumer consumer = session.createConsumer(tempTopic);
+
+            try {
+                tempTopic.delete();
+                fail("Should not be able to delete an in use temp destination");
+            } catch (JMSException ex) {
+            }
+
+            testPeer.expectDetach(true, true, true);
+            consumer.close();
+
+            // Deleting the TemporaryQueue will be achieved by closing its creating link.
+            testPeer.expectDetach(true, true, true);
+            tempTopic.delete();
+
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -611,9 +863,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
 
             testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
             testPeer.expectLinkFlow();
+            testPeer.expectClose();
 
             session.createConsumer(dest);
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -639,6 +894,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             } catch (JMSSecurityException jmsse) {
             }
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -664,6 +922,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             } catch (JMSSecurityException jmsse) {
             }
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -686,6 +947,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             } catch (InvalidSelectorException jmsse) {
             }
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -743,8 +1007,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             targetMatcher.withCapabilities(arrayContaining(nodeTypeCapability));
 
             testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectClose();
 
             session.createProducer(dest);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
         }
     }
 
@@ -774,6 +1043,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             MessageProducer producer = session.createProducer(null);
             assertNotNull("Producer object was null", producer);
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -855,10 +1127,13 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.expectSenderAttach(targetMatcher, false, false);
             testPeer.expectTransfer(messageMatcher);
             testPeer.expectDetach(true, true, true);
+            testPeer.expectClose();
 
             Message message = session.createMessage();
             producer.send(dest, message);
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -884,6 +1159,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             assertFalse("TopicSubscriber should not be no-local", subscriber.getNoLocal());
             assertNull("TopicSubscriber should not have a selector", subscriber.getMessageSelector());
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -943,6 +1221,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 // Expected
             }
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -968,6 +1249,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.expectDetach(false, true, false);
             subscriber.close();
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -1016,6 +1300,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
 
             producer.send(dest, message);
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -1050,6 +1337,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.expectSenderAttach(targetMatcher, true, false);
             //Expect the detach response to the test peer closing the producer link after refusal.
             testPeer.expectDetach(true, false, false);
+            testPeer.expectClose();
 
             try {
                 session.createProducer(null);
@@ -1058,6 +1346,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 //expected
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -1092,6 +1382,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.expectSenderAttach(targetMatcher, true, deferAttachResponseWrite);
             //Expect the detach response to the test peer closing the producer link after refusal.
             testPeer.expectDetach(true, false, false);
+            testPeer.expectClose();
 
             try {
                 //Create a producer, expect it to throw exception due to the link-refusal
@@ -1101,6 +1392,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 //Expected
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -1137,6 +1430,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
                 LOG.info("Caught expected exception on create: {}", ex.getMessage());
             }
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -1190,6 +1486,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
 
             producer.send(dest, message);
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -1235,6 +1534,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             assertTrue("Should have received the final message", m instanceof TextMessage);
             assertEquals("Unexpected content", expectedContent, ((TextMessage)m).getText());
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
@@ -1256,6 +1558,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
 
             session.createConsumer(queue);
 
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -1321,6 +1626,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             // Try closing it explicitly, should effectively no-op in client.
             // The test peer will throw during close if it sends anything.
             producer.close();
+
+            testPeer.expectClose();
+            connection.close();
         }
     }
 
@@ -1384,6 +1692,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             // Try closing it explicitly, should effectively no-op in client.
             // The test peer will throw during close if it sends anything.
             consumer.close();
+
+            testPeer.expectClose();
+            connection.close();
         }
     }
 
@@ -1480,9 +1791,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a declared disposition state containing the txnId.
             Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
-            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
-            testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+            testPeer.expectDeclare(txnId);
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
 
@@ -1516,6 +1825,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
             assertTrue("Not all messages received in given time", done.await(10, TimeUnit.SECONDS));
             assertEquals("Messages were not in expected order, final index was wrong", messageCount - 1, index.get());
+
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org