You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/01/10 15:30:22 UTC

qpid-jms-amqp-0-x git commit: QPID-8074: [JMS AMQP 0-x][System Tests] Copy AMQP 0-8..0-9-1 tests for unroutable messages from Broker-J into client

Repository: qpid-jms-amqp-0-x
Updated Branches:
  refs/heads/master daf535410 -> ae33f652d


QPID-8074: [JMS AMQP 0-x][System Tests] Copy AMQP 0-8..0-9-1 tests for unroutable messages from Broker-J into client


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/ae33f652
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/ae33f652
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/ae33f652

Branch: refs/heads/master
Commit: ae33f652dbad7e199bca8544069d123633d41a77
Parents: daf5354
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Jan 10 15:29:55 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Jan 10 15:29:55 2018 +0000

----------------------------------------------------------------------
 systests/pom.xml                                |   1 +
 .../apache/qpid/systest/core/BrokerAdmin.java   |   2 +
 .../apache/qpid/systest/core/JmsTestBase.java   |  34 ++-
 .../core/brokerj/SpawnQpidBrokerAdmin.java      |  29 +-
 .../AddressBasedDestinationTest.java            |  17 +-
 .../CloseOnNoRouteForMandatoryMessageTest.java  | 189 ++++++++++++
 .../ImmediateAndMandatoryPublishingTest.java    | 281 +++++++++++++++++
 .../ReturnUnroutableMandatoryMessageTest.java   | 300 +++++++++++++++++++
 .../UnroutableMessageTestExceptionListener.java | 156 ++++++++++
 9 files changed, 990 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/pom.xml
----------------------------------------------------------------------
diff --git a/systests/pom.xml b/systests/pom.xml
index 70bca14..8178bf4 100644
--- a/systests/pom.xml
+++ b/systests/pom.xml
@@ -122,6 +122,7 @@
                 <configuration>
                     <systemPropertyVariables>
                        <qpid.amqp.version>${qpid.amqp.version}</qpid.amqp.version>
+                        <qpid.dest_syntax>BURL</qpid.dest_syntax>
                     </systemPropertyVariables>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
index b2b41cc..befba5f 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
@@ -22,6 +22,7 @@ package org.apache.qpid.systest.core;
 
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.util.Map;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -47,6 +48,7 @@ public interface BrokerAdmin
     BrokerType getBrokerType();
 
     Connection getConnection() throws JMSException;
+    Connection getConnection(Map<String, String> options) throws JMSException;
 
     enum PortType
     {

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java b/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
index 49cc3b2..12bf145 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
@@ -24,6 +24,8 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.junit.Assume.assumeThat;
 
+import java.util.Map;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
@@ -36,6 +38,8 @@ import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.configuration.ClientProperties;
+
 
 public abstract class JmsTestBase extends BrokerAdminUsingTestBase
 {
@@ -47,22 +51,28 @@ public abstract class JmsTestBase extends BrokerAdminUsingTestBase
     @Before
     public void setUpTestBase()
     {
-        assumeThat(String.format("BrokerAdmin is not available. Skipping the test %s#%s",
-                                 getClass().getName(),
-                                 _testName.getMethodName()),
-                   getBrokerAdmin(), is(notNullValue()));
         LOGGER.debug("Test receive timeout is {} milliseconds", getReceiveTimeout());
     }
 
-
     protected Connection getConnection() throws JMSException, NamingException
     {
+        return getBrokerAdmin().getConnection();
+    }
+
+    @Override
+    public BrokerAdmin getBrokerAdmin()
+    {
+        BrokerAdmin admin = super.getBrokerAdmin();
         assumeThat(String.format("BrokerAdmin is not available. Skipping the test %s#%s",
                                  getClass().getName(),
                                  _testName.getMethodName()),
-                   getBrokerAdmin(), is(notNullValue()));
+                   admin, is(notNullValue()));
+        return admin;
+    }
 
-        return getBrokerAdmin().getConnection();
+    protected Connection getConnection(final Map<String, String> options) throws JMSException
+    {
+        return getBrokerAdmin().getConnection(options);
     }
 
     protected static long getReceiveTimeout()
@@ -85,4 +95,14 @@ public abstract class JmsTestBase extends BrokerAdminUsingTestBase
     {
         return (QueueConnection) getConnection();
     }
+
+    protected String getProtocol()
+    {
+        return System.getProperty(ClientProperties.AMQP_VERSION, "0-10");
+    }
+
+    protected String getTestQueueName()
+    {
+        return getTestName();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
index 3893288..8842e83 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
@@ -236,6 +236,12 @@ public class SpawnQpidBrokerAdmin implements BrokerAdmin
         return createConnection(_virtualHostNodeName);
     }
 
+    @Override
+    public Connection getConnection(final Map<String, String> options) throws JMSException
+    {
+        return createConnection(_virtualHostNodeName, options);
+    }
+
     private void startBroker(final Class testClass)
     {
         try
@@ -632,16 +638,29 @@ public class SpawnQpidBrokerAdmin implements BrokerAdmin
 
     private Connection createConnection(String virtualHostName) throws JMSException
     {
+        return createConnection(virtualHostName, null);
+    }
+
+    private Connection createConnection(String virtualHostName,
+                                        final Map<String, String> options) throws JMSException
+    {
         final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
         initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
                                       "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
         final String factoryName = "connectionFactory";
         String urlTemplate = "amqp://:@%s/%s?brokerlist='tcp://localhost:%d?failover='nofailover''";
-        String url = String.format(urlTemplate,
-                                   "spawn_broker_admin",
-                                   virtualHostName,
-                                   getBrokerAddress(PortType.AMQP).getPort());
-        initialContextEnvironment.put("connectionfactory." + factoryName, url);
+        StringBuilder url = new StringBuilder(String.format(urlTemplate,
+                                                            "spawn_broker_admin",
+                                                            virtualHostName,
+                                                            getBrokerAddress(PortType.AMQP).getPort()));
+        if (options != null)
+        {
+            for (Map.Entry<String, String> option : options.entrySet())
+            {
+                url.append("&").append(option.getKey()).append("='").append(option.getValue()).append("'");
+            }
+        }
+        initialContextEnvironment.put("connectionfactory." + factoryName, url.toString());
         try
         {
             InitialContext initialContext = new InitialContext(initialContextEnvironment);

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java b/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
index 711a50e..cced94d 100644
--- a/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
@@ -309,7 +309,7 @@ public class AddressBasedDestinationTest extends JmsTestBase
     @Test
     public void testCreateExchangeWithArgs() throws Exception
     {
-        assumeThat("Not supported by Broker-J", isCppBroker(), is(equalTo(true)));
+        assumeThat("QPID-3392: Not supported by Broker-J", isCppBroker(), is(equalTo(true)));
         createExchangeImpl(true, false, false);
     }
 
@@ -321,6 +321,10 @@ public class AddressBasedDestinationTest extends JmsTestBase
     @Test
     public void testCreateExchangeWithNonsenseArgs() throws Exception
     {
+        assumeThat("Broker-J is required",
+                   getBrokerAdmin().getBrokerType(),
+                   is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
         createExchangeImpl(true, true, false);
     }
 
@@ -782,7 +786,7 @@ public class AddressBasedDestinationTest extends JmsTestBase
     @Test
     public void testSessionCreateTopicWithExchangeArgs() throws Exception
     {
-        assumeThat("Not supported by Broker-J", isCppBroker(), is(equalTo(true)));
+        assumeThat("QPID-3392: Not supported by Broker-J", isCppBroker(), is(equalTo(true)));
         sessionCreateTopicImpl(true);
     }
 
@@ -1195,6 +1199,10 @@ public class AddressBasedDestinationTest extends JmsTestBase
     @Test
     public void testDeleteOptions() throws Exception
     {
+        assumeThat("Broker-J is required",
+                   getBrokerAdmin().getBrokerType(),
+                   is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
         Session jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         // default (create never, assert never) -------------------
@@ -1735,11 +1743,6 @@ public class AddressBasedDestinationTest extends JmsTestBase
         }
     }
 
-    private String getProtocol()
-    {
-        return System.getProperty(ClientProperties.AMQP_VERSION, "0-10");
-    }
-
     private boolean isBroker010()
     {
         return "0-10".equals(getProtocol());

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/test/java/org/apache/qpid/systest/producer/CloseOnNoRouteForMandatoryMessageTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/producer/CloseOnNoRouteForMandatoryMessageTest.java b/systests/src/test/java/org/apache/qpid/systest/producer/CloseOnNoRouteForMandatoryMessageTest.java
new file mode 100644
index 0000000..03b2458
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/producer/CloseOnNoRouteForMandatoryMessageTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.producer;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.IllegalStateException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.producer.noroute.UnroutableMessageTestExceptionListener;
+
+public class CloseOnNoRouteForMandatoryMessageTest extends JmsTestBase
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(CloseOnNoRouteForMandatoryMessageTest.class);
+
+    private UnroutableMessageTestExceptionListener _testExceptionListener;
+
+    @Before
+    public void setUp()
+    {
+        _testExceptionListener = new UnroutableMessageTestExceptionListener();
+
+        assumeThat("Feature 'close on no route' is only implemented Broker-J",
+                   getBrokerAdmin().getBrokerType(),
+                   is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
+        assumeThat("Feature 'close on no route' is only implemented for AMQP 0-8..0-9-1",
+                   getProtocol(),
+                   is(not(equalTo("0-10"))));
+    }
+
+    @Test
+    public void testNoRoute_brokerClosesConnection() throws Exception
+    {
+        Connection connection = createConnectionWithCloseWhenNoRoute(true);
+        try
+        {
+            Session transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            String testQueueName = getTestQueueName();
+            MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+                    transactedSession.createQueue(testQueueName),
+                    true, // mandatory
+                    false); // immediate
+
+            Message message = transactedSession.createMessage();
+            mandatoryProducer.send(message);
+
+            _testExceptionListener.assertReceivedNoRoute(testQueueName);
+
+            try
+            {
+                transactedSession.commit();
+                fail("Expected exception not thrown");
+            }
+            catch (IllegalStateException ise)
+            {
+                LOGGER.debug("Caught exception", ise);
+                //The session was marked closed even before we had a chance to call commit on it
+                assertTrue("ISE did not indicate closure", ise.getMessage().contains("closed"));
+            }
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void testNoRouteForNonMandatoryMessage_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+    {
+        Connection connection = createConnectionWithCloseWhenNoRoute(true);
+        try
+        {
+            Session transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            String testQueueName = getTestQueueName();
+            MessageProducer nonMandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+                    transactedSession.createQueue(testQueueName),
+                    false, // mandatory
+                    false); // immediate
+
+            Message message = transactedSession.createMessage();
+            nonMandatoryProducer.send(message);
+
+            // should succeed - the message is simply discarded
+            transactedSession.commit();
+
+            _testExceptionListener.assertNoException();
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void testNoRouteOnNonTransactionalSession_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+    {
+        Connection connection = createConnectionWithCloseWhenNoRoute(true);
+        try
+        {
+            Session nonTransactedSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String testQueueName = getTestQueueName();
+            MessageProducer mandatoryProducer = ((AMQSession<?, ?>) nonTransactedSession).createProducer(
+                    nonTransactedSession.createQueue(testQueueName),
+                    true, // mandatory
+                    false); // immediate
+
+            Message message = nonTransactedSession.createMessage();
+            mandatoryProducer.send(message);
+
+            _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, testQueueName);
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void testClientDisablesCloseOnNoRoute_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+    {
+        Connection connection = createConnectionWithCloseWhenNoRoute(false);
+        try
+        {
+            Session transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            String testQueueName = getTestQueueName();
+            MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+                    transactedSession.createQueue(testQueueName),
+                    true, // mandatory
+                    false); // immediate
+
+            Message message = transactedSession.createMessage();
+            mandatoryProducer.send(message);
+            transactedSession.commit();
+            _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, testQueueName);
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    private Connection createConnectionWithCloseWhenNoRoute(boolean closeWhenNoRoute) throws Exception
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(closeWhenNoRoute));
+        Connection connection = getConnection(options);
+        connection.setExceptionListener(_testExceptionListener);
+        return connection;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/test/java/org/apache/qpid/systest/producer/ImmediateAndMandatoryPublishingTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/producer/ImmediateAndMandatoryPublishingTest.java b/systests/src/test/java/org/apache/qpid/systest/producer/ImmediateAndMandatoryPublishingTest.java
new file mode 100644
index 0000000..9618feb
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/producer/ImmediateAndMandatoryPublishingTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.producer;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assume.assumeThat;
+
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.producer.noroute.UnroutableMessageTestExceptionListener;
+
+public class ImmediateAndMandatoryPublishingTest extends JmsTestBase
+{
+    private UnroutableMessageTestExceptionListener _testExceptionListener;
+    private Connection _connection;
+
+    @Before
+    public void setUp() throws JMSException
+    {
+        assumeThat("AMQP 0-8..0-9-1 specific behaviour",
+                   getProtocol(),
+                   is(not(equalTo("0-10"))));
+
+        _testExceptionListener = new UnroutableMessageTestExceptionListener();
+        _connection = getConnection(Collections.singletonMap(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(false)));
+        _connection.setExceptionListener(_testExceptionListener);
+    }
+
+    @After
+    public void tearDown() throws JMSException
+    {
+        if (_connection != null)
+        {
+            _connection.close();
+        }
+    }
+
+    @Test
+    public void testPublishP2PWithNoConsumerAndImmediateOnAndAutoAck() throws Exception
+    {
+        publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.AUTO_ACKNOWLEDGE, false);
+    }
+
+    @Test
+    public void testPublishP2PWithNoConsumerAndImmediateOnAndTx() throws Exception
+    {
+        publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.SESSION_TRANSACTED, false);
+    }
+
+    @Test
+    public void testPublishPubSubWithDisconnectedDurableSubscriberAndImmediateOnAndAutoAck() throws Exception
+    {
+        publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.AUTO_ACKNOWLEDGE, true);
+    }
+
+    @Test
+    public void testPublishPubSubWithDisconnectedDurableSubscriberAndImmediateOnAndTx() throws Exception
+    {
+        publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.SESSION_TRANSACTED, true);
+    }
+
+    @Test
+    public void testPublishP2PIntoNonExistingDesitinationWithMandatoryOnAutoAck() throws Exception
+    {
+        publishWithMandatoryOnImmediateOff(Session.AUTO_ACKNOWLEDGE, false);
+    }
+
+    @Test
+    public void testPublishP2PIntoNonExistingDesitinationWithMandatoryOnAndTx() throws Exception
+    {
+        publishWithMandatoryOnImmediateOff(Session.SESSION_TRANSACTED, false);
+    }
+
+    @Test
+    public void testPubSubMandatoryAutoAck() throws Exception
+    {
+        publishWithMandatoryOnImmediateOff(Session.AUTO_ACKNOWLEDGE, true);
+    }
+
+    @Test
+    public void testPubSubMandatoryTx() throws Exception
+    {
+        publishWithMandatoryOnImmediateOff(Session.SESSION_TRANSACTED, true);
+    }
+
+    @Test
+    public void testP2PNoMandatoryAutoAck() throws Exception
+    {
+        publishWithMandatoryOffImmediateOff(Session.AUTO_ACKNOWLEDGE, false);
+    }
+
+    @Test
+    public void testP2PNoMandatoryTx() throws Exception
+    {
+        publishWithMandatoryOffImmediateOff(Session.SESSION_TRANSACTED, false);
+    }
+
+    @Test
+    public void testPubSubWithImmediateOnAndAutoAck() throws Exception
+    {
+        consumerCreateAndClose(true, false);
+
+        Message message = produceMessage(Session.AUTO_ACKNOWLEDGE, true, false, true);
+        _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+    }
+
+    private void publishIntoExistingDestinationWithNoConsumerAndImmediateOn(int acknowledgeMode, boolean pubSub)
+            throws Exception
+    {
+        consumerCreateAndClose(pubSub, true);
+
+        Message message = produceMessage(acknowledgeMode, pubSub, false, true);
+
+        _testExceptionListener.assertReceivedNoConsumersWithReturnedMessage(message);
+    }
+
+    private void publishWithMandatoryOnImmediateOff(int acknowledgeMode, boolean pubSub) throws Exception
+    {
+        Message message = produceMessage(acknowledgeMode, pubSub, true, false);
+        _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+    }
+
+    private void publishWithMandatoryOffImmediateOff(int acknowledgeMode, boolean pubSub) throws Exception
+    {
+        produceMessage(acknowledgeMode, pubSub, false, false);
+
+        _testExceptionListener.assertNoException();
+    }
+
+    private void consumerCreateAndClose(boolean pubSub, boolean durable) throws JMSException
+    {
+        Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination;
+        MessageConsumer consumer;
+        if (pubSub)
+        {
+            destination = session.createTopic(getTestQueueName());
+            if (durable)
+            {
+                consumer = session.createDurableSubscriber((Topic) destination, getTestName());
+            }
+            else
+            {
+                consumer = session.createConsumer(destination);
+            }
+        }
+        else
+        {
+            destination = session.createQueue(getTestQueueName());
+            consumer = session.createConsumer(destination);
+        }
+        consumer.close();
+    }
+
+    private Message produceMessage(int acknowledgeMode, boolean pubSub, boolean mandatory, boolean immediate)
+            throws JMSException
+    {
+        Session session = _connection.createSession(acknowledgeMode == Session.SESSION_TRANSACTED, acknowledgeMode);
+        Destination destination;
+        if (pubSub)
+        {
+            destination = session.createTopic(getTestQueueName());
+        }
+        else
+        {
+            destination = session.createQueue(getTestQueueName());
+        }
+
+        MessageProducer producer = ((AMQSession<?, ?>) session).createProducer(destination, mandatory, immediate);
+        Message message = session.createMessage();
+        producer.send(message);
+        if (session.getTransacted())
+        {
+            session.commit();
+        }
+        return message;
+    }
+
+    public void testMandatoryAndImmediateDefaults() throws Exception
+    {
+        Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // publish to non-existent queue - should get mandatory failure
+        MessageProducer producer = session.createProducer(session.createQueue(getTestQueueName()));
+        Message message = session.createMessage();
+        producer.send(message);
+
+        _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+
+        producer = session.createProducer(null);
+        message = session.createMessage();
+        producer.send(session.createQueue(getTestQueueName()), message);
+
+        _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+
+        // publish to non-existent topic - should get no failure
+        producer = session.createProducer(session.createTopic(getTestQueueName()));
+        message = session.createMessage();
+        producer.send(message);
+
+        _testExceptionListener.assertNoException();
+
+        producer = session.createProducer(null);
+        message = session.createMessage();
+        producer.send(session.createTopic(getTestQueueName()), message);
+
+        _testExceptionListener.assertNoException();
+
+        session.close();
+    }
+
+    public void testMandatoryAndImmediateSystemProperties() throws Exception
+    {
+        Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        System.setProperty("qpid.default_mandatory", "true");
+        try
+        {
+
+            // publish to non-existent topic - should get mandatory failure
+
+            MessageProducer producer = session.createProducer(session.createTopic(getTestQueueName()));
+            Message message = session.createMessage();
+            producer.send(message);
+
+            _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+        }
+        finally
+        {
+            System.clearProperty("qpid.default_mandatory");
+        }
+
+        // now set topic specific system property to false - should no longer get mandatory failure on new producer
+        System.setProperty("qpid.default_mandatory_topic", "false");
+        try
+        {
+            MessageProducer producer = session.createProducer(null);
+            Message message = session.createMessage();
+            producer.send(session.createTopic(getTestQueueName()), message);
+
+            _testExceptionListener.assertNoException();
+        }
+        finally
+        {
+            System.clearProperty("qpid.default_mandatory");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/test/java/org/apache/qpid/systest/producer/ReturnUnroutableMandatoryMessageTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/producer/ReturnUnroutableMandatoryMessageTest.java b/systests/src/test/java/org/apache/qpid/systest/producer/ReturnUnroutableMandatoryMessageTest.java
new file mode 100644
index 0000000..179d96b
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/producer/ReturnUnroutableMandatoryMessageTest.java
@@ -0,0 +1,300 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.producer;
+
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQHeadersExchange;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+
+public class ReturnUnroutableMandatoryMessageTest extends JmsTestBase implements ExceptionListener
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReturnUnroutableMandatoryMessageTest.class);
+
+    private Message _bouncedMessage;
+    private CountDownLatch _bounceMessageWaiter;
+
+    @Before
+    public void setUp() throws JMSException
+    {
+        assumeThat("AMQP 0-8..0-9-1 specific behaviour",
+                   getProtocol(),
+                   is(not(equalTo("0-10"))));
+        _bounceMessageWaiter = new CountDownLatch(1);
+    }
+
+    @Test
+    public void testReturnUnroutableMandatoryMessage_HEADERS() throws Exception
+    {
+        MessageConsumer consumer;
+        AMQSession producerSession;
+        AMQHeadersExchange queue;
+        Connection con = getConnection();
+        try
+        {
+
+            AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS
+                                                             + "://"
+                                                             + ExchangeDefaults.HEADERS_EXCHANGE_NAME
+                                                             + "/test/queue1?"
+                                                             + BindingURL.OPTION_ROUTING_KEY
+                                                             + "='F0000=1'"));
+
+            Map<String, Object> ft = new HashMap<>();
+            ft.put("F1000", "1");
+            consumerSession.declareAndBind(queue, ft);
+
+            consumer = consumerSession.createConsumer(queue);
+
+            Connection con2 = getConnection();
+            try
+            {
+                con2.setExceptionListener(this);
+                producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+                // Need to start the "producer" connection in order to receive bounced messages
+                LOGGER.info("Starting producer connection");
+                con2.start();
+                try
+                {
+                    MessageProducer nonMandatoryProducer = producerSession.createProducer(queue, false, false);
+                    MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+
+                    // First test - should neither be bounced nor routed
+                    LOGGER.info("Sending non-routable non-mandatory message");
+                    TextMessage msg1 = producerSession.createTextMessage("msg1");
+                    nonMandatoryProducer.send(msg1);
+
+                    // Second test - should be bounced
+                    LOGGER.info("Sending non-routable mandatory message");
+                    TextMessage msg2 = producerSession.createTextMessage("msg2");
+                    mandatoryProducer.send(msg2);
+
+                    // Third test - should be routed
+                    LOGGER.info("Sending routable message");
+                    TextMessage msg3 = producerSession.createTextMessage("msg3");
+                    msg3.setStringProperty("F1000", "1");
+                    mandatoryProducer.send(msg3);
+
+                    LOGGER.info("Starting consumer connection");
+                    con.start();
+                    TextMessage tm = (TextMessage) consumer.receive(getReceiveTimeout());
+
+                    assertTrue("No message routed to receiver", tm != null);
+                    assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg3".equals(tm.getText()));
+                    assertTrue("Message is not bounced",
+                               _bounceMessageWaiter.await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
+                    assertNotNull(_bouncedMessage);
+                    Message m = _bouncedMessage;
+                    assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+                }
+                catch (JMSException jmse)
+                {
+                    // pass
+                }
+            }
+            finally
+            {
+                con2.close();
+            }
+        }
+        finally
+        {
+            con.close();
+        }
+    }
+
+    @Test
+    public void testReturnUnroutableMandatoryMessage_QUEUE() throws Exception
+    {
+        Connection con = getConnection();
+        try
+        {
+
+            AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            AMQQueue valid_queue =
+                    new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "testReturnUnroutableMandatoryMessage_QUEUE");
+            AMQQueue invalid_queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+                                                  "testReturnUnroutableMandatoryMessage_QUEUE_INVALID");
+            MessageConsumer consumer = consumerSession.createConsumer(valid_queue);
+
+            Connection con2 = getConnection();
+            try
+            {
+                con2.setExceptionListener(this);
+                AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+                // Need to start the "producer" connection in order to receive bounced messages
+                LOGGER.info("Starting producer connection");
+                con2.start();
+
+                MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_queue, false, false);
+                MessageProducer mandatoryProducer = producerSession.createProducer(invalid_queue);
+
+                // First test - should be routed
+                LOGGER.info("Sending non-mandatory message");
+                TextMessage msg1 = producerSession.createTextMessage("msg1");
+                nonMandatoryProducer.send(msg1);
+
+                // Second test - should be bounced
+                LOGGER.info("Sending non-routable mandatory message");
+                TextMessage msg2 = producerSession.createTextMessage("msg2");
+                mandatoryProducer.send(msg2);
+
+                LOGGER.info("Starting consumer connection");
+                con.start();
+                TextMessage tm = (TextMessage) consumer.receive(getReceiveTimeout());
+
+                assertTrue("No message routed to receiver", tm != null);
+                assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+
+                assertTrue("Message is not bounced",
+                           _bounceMessageWaiter.await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
+                assertNotNull(_bouncedMessage);
+                Message m = _bouncedMessage;
+                assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+            }
+            finally
+            {
+                con2.close();
+            }
+        }
+        finally
+        {
+            con.close();
+        }
+    }
+
+    @Test
+    public void testReturnUnroutableMandatoryMessage_TOPIC() throws Exception
+    {
+        Connection con = getConnection();
+        try
+        {
+            AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            AMQTopic valid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS,
+                                                "test.Return.Unroutable.Mandatory.Message.TOPIC");
+            AMQTopic invalid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS,
+                                                  "test.Return.Unroutable.Mandatory.Message.TOPIC.invalid");
+            MessageConsumer consumer = consumerSession.createConsumer(valid_topic);
+
+            Connection con2 = getConnection();
+            try
+            {
+                con2.setExceptionListener(this);
+                AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+                // Need to start the "producer" connection in order to receive bounced messages
+                LOGGER.info("Starting producer connection");
+                con2.start();
+
+                MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_topic, false, false);
+                MessageProducer mandatoryProducer = producerSession.createProducer(invalid_topic, false, true);
+
+                // First test - should be routed
+                LOGGER.info("Sending non-mandatory message");
+                TextMessage msg1 = producerSession.createTextMessage("msg1");
+                nonMandatoryProducer.send(msg1);
+
+                // Second test - should be bounced
+                LOGGER.info("Sending non-routable mandatory message");
+                TextMessage msg2 = producerSession.createTextMessage("msg2");
+                mandatoryProducer.send(msg2);
+
+                LOGGER.info("Starting consumer connection");
+                con.start();
+                TextMessage tm = (TextMessage) consumer.receive(getReceiveTimeout());
+
+                assertTrue("No message routed to receiver", tm != null);
+                assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+
+                assertTrue("Message is not bounced",
+                           _bounceMessageWaiter.await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
+                assertNotNull(_bouncedMessage);
+                Message m = _bouncedMessage;
+                assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+            }
+            finally
+            {
+                con2.close();
+            }
+        }
+        finally
+        {
+            con.close();
+        }
+    }
+
+    @Override
+    public void onException(JMSException jmsException)
+    {
+        Exception linkedException;
+        linkedException = jmsException.getLinkedException();
+        if (linkedException instanceof AMQNoRouteException)
+        {
+            LOGGER.info("Caught expected NoRouteException");
+            AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
+            _bouncedMessage = (Message) noRoute.getUndeliveredMessage();
+            _bounceMessageWaiter.countDown();
+        }
+        else
+        {
+            LOGGER.warn("Caught exception on producer: ", jmsException);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/test/java/org/apache/qpid/systest/producer/noroute/UnroutableMessageTestExceptionListener.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/producer/noroute/UnroutableMessageTestExceptionListener.java b/systests/src/test/java/org/apache/qpid/systest/producer/noroute/UnroutableMessageTestExceptionListener.java
new file mode 100644
index 0000000..476a31a
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/producer/noroute/UnroutableMessageTestExceptionListener.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.producer.noroute;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+
+/**
+ * Provides utility methods for checking exceptions that are thrown on the client side when a message is
+ * not routable.
+ *
+ * Exception objects are passed either explicitly as method parameters or implicitly
+ * by previously doing {@link Connection#setExceptionListener(ExceptionListener)}.
+ */
+public class UnroutableMessageTestExceptionListener implements ExceptionListener
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(UnroutableMessageTestExceptionListener.class);
+
+    /**
+     * Number of seconds to check for an event that should should NOT happen
+     */
+    private static final int NEGATIVE_TIMEOUT = 2;
+
+    /**
+     * Number of seconds to keep checking for an event that should should happen
+     */
+    private static final int POSITIVE_TIMEOUT = 30;
+
+    private BlockingQueue<JMSException> _exceptions = new ArrayBlockingQueue<JMSException>(1);
+
+    @Override
+    public void onException(JMSException e)
+    {
+        LOGGER.info("Received exception " + e);
+        _exceptions.add(e);
+    }
+
+    public void assertReceivedNoRouteWithReturnedMessage(Message message, String intendedQueueName) throws Exception
+    {
+        JMSException exception = getReceivedException();
+        assertNoRouteExceptionWithReturnedMessage(exception, message, intendedQueueName);
+    }
+
+    public void assertReceivedNoRoute(String intendedQueueName) throws Exception
+    {
+        JMSException exception = getReceivedException();
+        assertNoRoute(exception, intendedQueueName);
+    }
+
+    public void assertReceivedNoConsumersWithReturnedMessage(Message message) throws Exception
+    {
+        JMSException exception = getReceivedException();
+        AMQNoConsumersException noConsumersException = (AMQNoConsumersException) exception.getLinkedException();
+        assertNotNull("AMQNoConsumersException should be linked to JMSException", noConsumersException);
+        Message bounceMessage = (Message) noConsumersException.getUndeliveredMessage();
+        assertNotNull("Bounced Message is expected", bounceMessage);
+
+        assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+    }
+
+    public void assertNoRouteExceptionWithReturnedMessage(
+            JMSException exception, Message message, String intendedQueueName) throws Exception
+    {
+        assertNoRoute(exception, intendedQueueName);
+
+        assertNoRouteException(exception, message);
+    }
+
+    private void assertNoRouteException(JMSException exception, Message message) throws Exception
+    {
+        AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
+        assertNotNull("AMQNoRouteException should be linked to JMSException", noRouteException);
+        Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
+        assertNotNull("Bounced Message is expected", bounceMessage);
+
+        assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+    }
+
+    public void assertNoRoute(JMSException exception, String intendedQueueName)
+    {
+        assertTrue(
+                exception + " message should contain intended queue name",
+                exception.getMessage().contains(intendedQueueName));
+
+        AMQException noRouteException = (AMQException) exception.getLinkedException();
+        assertNotNull("AMQException should be linked to JMSException", noRouteException);
+
+        assertAMQException("Unexpected error code", 312, noRouteException);
+        assertTrue(
+                "Linked exception " + noRouteException + " message should contain intended queue name",
+                noRouteException.getMessage().contains(intendedQueueName));
+    }
+
+
+    public void assertNoException() throws Exception
+    {
+        assertNull("Unexpected JMSException", _exceptions.poll(NEGATIVE_TIMEOUT, TimeUnit.SECONDS));
+    }
+
+    private JMSException getReceivedException() throws Exception
+    {
+        JMSException exception = _exceptions.poll(POSITIVE_TIMEOUT, TimeUnit.SECONDS);
+        assertNotNull("JMSException is expected", exception);
+        return exception;
+    }
+
+    private void assertAMQException(final String message, final int expected, final AMQException e)
+    {
+        Object object = e.getErrorCode(); // API change after v6.1
+        if (object instanceof Integer)
+        {
+            assertEquals(message, expected, e.getErrorCode());
+        }
+        else
+        {
+            final String fullMessage = String.format("%s. expected actual : %s to start with %d", message, e.getErrorCode(), expected);
+            final String actual = String.valueOf(e.getErrorCode());
+            assertTrue(fullMessage, actual.startsWith(Integer.toString(expected)));
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org