You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/01/10 15:30:22 UTC
qpid-jms-amqp-0-x git commit: QPID-8074: [JMS AMQP 0-x][System Tests]
Copy AMQP 0-8..0-9-1 tests for unroutable messages from Broker-J into client
Repository: qpid-jms-amqp-0-x
Updated Branches:
refs/heads/master daf535410 -> ae33f652d
QPID-8074: [JMS AMQP 0-x][System Tests] Copy AMQP 0-8..0-9-1 tests for unroutable messages from Broker-J into client
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/ae33f652
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/ae33f652
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/ae33f652
Branch: refs/heads/master
Commit: ae33f652dbad7e199bca8544069d123633d41a77
Parents: daf5354
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Jan 10 15:29:55 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Jan 10 15:29:55 2018 +0000
----------------------------------------------------------------------
systests/pom.xml | 1 +
.../apache/qpid/systest/core/BrokerAdmin.java | 2 +
.../apache/qpid/systest/core/JmsTestBase.java | 34 ++-
.../core/brokerj/SpawnQpidBrokerAdmin.java | 29 +-
.../AddressBasedDestinationTest.java | 17 +-
.../CloseOnNoRouteForMandatoryMessageTest.java | 189 ++++++++++++
.../ImmediateAndMandatoryPublishingTest.java | 281 +++++++++++++++++
.../ReturnUnroutableMandatoryMessageTest.java | 300 +++++++++++++++++++
.../UnroutableMessageTestExceptionListener.java | 156 ++++++++++
9 files changed, 990 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/pom.xml
----------------------------------------------------------------------
diff --git a/systests/pom.xml b/systests/pom.xml
index 70bca14..8178bf4 100644
--- a/systests/pom.xml
+++ b/systests/pom.xml
@@ -122,6 +122,7 @@
<configuration>
<systemPropertyVariables>
<qpid.amqp.version>${qpid.amqp.version}</qpid.amqp.version>
+ <qpid.dest_syntax>BURL</qpid.dest_syntax>
</systemPropertyVariables>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
index b2b41cc..befba5f 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
@@ -22,6 +22,7 @@ package org.apache.qpid.systest.core;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
+import java.util.Map;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -47,6 +48,7 @@ public interface BrokerAdmin
BrokerType getBrokerType();
Connection getConnection() throws JMSException;
+ Connection getConnection(Map<String, String> options) throws JMSException;
enum PortType
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java b/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
index 49cc3b2..12bf145 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
@@ -24,6 +24,8 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assume.assumeThat;
+import java.util.Map;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
@@ -36,6 +38,8 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.configuration.ClientProperties;
+
public abstract class JmsTestBase extends BrokerAdminUsingTestBase
{
@@ -47,22 +51,28 @@ public abstract class JmsTestBase extends BrokerAdminUsingTestBase
@Before
public void setUpTestBase()
{
- assumeThat(String.format("BrokerAdmin is not available. Skipping the test %s#%s",
- getClass().getName(),
- _testName.getMethodName()),
- getBrokerAdmin(), is(notNullValue()));
LOGGER.debug("Test receive timeout is {} milliseconds", getReceiveTimeout());
}
-
protected Connection getConnection() throws JMSException, NamingException
{
+ return getBrokerAdmin().getConnection();
+ }
+
+ @Override
+ public BrokerAdmin getBrokerAdmin()
+ {
+ BrokerAdmin admin = super.getBrokerAdmin();
assumeThat(String.format("BrokerAdmin is not available. Skipping the test %s#%s",
getClass().getName(),
_testName.getMethodName()),
- getBrokerAdmin(), is(notNullValue()));
+ admin, is(notNullValue()));
+ return admin;
+ }
- return getBrokerAdmin().getConnection();
+ protected Connection getConnection(final Map<String, String> options) throws JMSException
+ {
+ return getBrokerAdmin().getConnection(options);
}
protected static long getReceiveTimeout()
@@ -85,4 +95,14 @@ public abstract class JmsTestBase extends BrokerAdminUsingTestBase
{
return (QueueConnection) getConnection();
}
+
+ protected String getProtocol()
+ {
+ return System.getProperty(ClientProperties.AMQP_VERSION, "0-10");
+ }
+
+ protected String getTestQueueName()
+ {
+ return getTestName();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
index 3893288..8842e83 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
@@ -236,6 +236,12 @@ public class SpawnQpidBrokerAdmin implements BrokerAdmin
return createConnection(_virtualHostNodeName);
}
+ @Override
+ public Connection getConnection(final Map<String, String> options) throws JMSException
+ {
+ return createConnection(_virtualHostNodeName, options);
+ }
+
private void startBroker(final Class testClass)
{
try
@@ -632,16 +638,29 @@ public class SpawnQpidBrokerAdmin implements BrokerAdmin
private Connection createConnection(String virtualHostName) throws JMSException
{
+ return createConnection(virtualHostName, null);
+ }
+
+ private Connection createConnection(String virtualHostName,
+ final Map<String, String> options) throws JMSException
+ {
final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
final String factoryName = "connectionFactory";
String urlTemplate = "amqp://:@%s/%s?brokerlist='tcp://localhost:%d?failover='nofailover''";
- String url = String.format(urlTemplate,
- "spawn_broker_admin",
- virtualHostName,
- getBrokerAddress(PortType.AMQP).getPort());
- initialContextEnvironment.put("connectionfactory." + factoryName, url);
+ StringBuilder url = new StringBuilder(String.format(urlTemplate,
+ "spawn_broker_admin",
+ virtualHostName,
+ getBrokerAddress(PortType.AMQP).getPort()));
+ if (options != null)
+ {
+ for (Map.Entry<String, String> option : options.entrySet())
+ {
+ url.append("&").append(option.getKey()).append("='").append(option.getValue()).append("'");
+ }
+ }
+ initialContextEnvironment.put("connectionfactory." + factoryName, url.toString());
try
{
InitialContext initialContext = new InitialContext(initialContextEnvironment);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java b/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
index 711a50e..cced94d 100644
--- a/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/destination/AddressBasedDestinationTest.java
@@ -309,7 +309,7 @@ public class AddressBasedDestinationTest extends JmsTestBase
@Test
public void testCreateExchangeWithArgs() throws Exception
{
- assumeThat("Not supported by Broker-J", isCppBroker(), is(equalTo(true)));
+ assumeThat("QPID-3392: Not supported by Broker-J", isCppBroker(), is(equalTo(true)));
createExchangeImpl(true, false, false);
}
@@ -321,6 +321,10 @@ public class AddressBasedDestinationTest extends JmsTestBase
@Test
public void testCreateExchangeWithNonsenseArgs() throws Exception
{
+ assumeThat("Broker-J is required",
+ getBrokerAdmin().getBrokerType(),
+ is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
createExchangeImpl(true, true, false);
}
@@ -782,7 +786,7 @@ public class AddressBasedDestinationTest extends JmsTestBase
@Test
public void testSessionCreateTopicWithExchangeArgs() throws Exception
{
- assumeThat("Not supported by Broker-J", isCppBroker(), is(equalTo(true)));
+ assumeThat("QPID-3392: Not supported by Broker-J", isCppBroker(), is(equalTo(true)));
sessionCreateTopicImpl(true);
}
@@ -1195,6 +1199,10 @@ public class AddressBasedDestinationTest extends JmsTestBase
@Test
public void testDeleteOptions() throws Exception
{
+ assumeThat("Broker-J is required",
+ getBrokerAdmin().getBrokerType(),
+ is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
Session jmsSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// default (create never, assert never) -------------------
@@ -1735,11 +1743,6 @@ public class AddressBasedDestinationTest extends JmsTestBase
}
}
- private String getProtocol()
- {
- return System.getProperty(ClientProperties.AMQP_VERSION, "0-10");
- }
-
private boolean isBroker010()
{
return "0-10".equals(getProtocol());
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/test/java/org/apache/qpid/systest/producer/CloseOnNoRouteForMandatoryMessageTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/producer/CloseOnNoRouteForMandatoryMessageTest.java b/systests/src/test/java/org/apache/qpid/systest/producer/CloseOnNoRouteForMandatoryMessageTest.java
new file mode 100644
index 0000000..03b2458
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/producer/CloseOnNoRouteForMandatoryMessageTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.producer;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.IllegalStateException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.producer.noroute.UnroutableMessageTestExceptionListener;
+
+public class CloseOnNoRouteForMandatoryMessageTest extends JmsTestBase
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(CloseOnNoRouteForMandatoryMessageTest.class);
+
+ private UnroutableMessageTestExceptionListener _testExceptionListener;
+
+ @Before
+ public void setUp()
+ {
+ _testExceptionListener = new UnroutableMessageTestExceptionListener();
+
+ assumeThat("Feature 'close on no route' is only implemented Broker-J",
+ getBrokerAdmin().getBrokerType(),
+ is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
+ assumeThat("Feature 'close on no route' is only implemented for AMQP 0-8..0-9-1",
+ getProtocol(),
+ is(not(equalTo("0-10"))));
+ }
+
+ @Test
+ public void testNoRoute_brokerClosesConnection() throws Exception
+ {
+ Connection connection = createConnectionWithCloseWhenNoRoute(true);
+ try
+ {
+ Session transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ String testQueueName = getTestQueueName();
+ MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+ transactedSession.createQueue(testQueueName),
+ true, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ mandatoryProducer.send(message);
+
+ _testExceptionListener.assertReceivedNoRoute(testQueueName);
+
+ try
+ {
+ transactedSession.commit();
+ fail("Expected exception not thrown");
+ }
+ catch (IllegalStateException ise)
+ {
+ LOGGER.debug("Caught exception", ise);
+ //The session was marked closed even before we had a chance to call commit on it
+ assertTrue("ISE did not indicate closure", ise.getMessage().contains("closed"));
+ }
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testNoRouteForNonMandatoryMessage_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+ {
+ Connection connection = createConnectionWithCloseWhenNoRoute(true);
+ try
+ {
+ Session transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ String testQueueName = getTestQueueName();
+ MessageProducer nonMandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+ transactedSession.createQueue(testQueueName),
+ false, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ nonMandatoryProducer.send(message);
+
+ // should succeed - the message is simply discarded
+ transactedSession.commit();
+
+ _testExceptionListener.assertNoException();
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testNoRouteOnNonTransactionalSession_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+ {
+ Connection connection = createConnectionWithCloseWhenNoRoute(true);
+ try
+ {
+ Session nonTransactedSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String testQueueName = getTestQueueName();
+ MessageProducer mandatoryProducer = ((AMQSession<?, ?>) nonTransactedSession).createProducer(
+ nonTransactedSession.createQueue(testQueueName),
+ true, // mandatory
+ false); // immediate
+
+ Message message = nonTransactedSession.createMessage();
+ mandatoryProducer.send(message);
+
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, testQueueName);
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testClientDisablesCloseOnNoRoute_brokerKeepsConnectionOpenAndCallsExceptionListener() throws Exception
+ {
+ Connection connection = createConnectionWithCloseWhenNoRoute(false);
+ try
+ {
+ Session transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ String testQueueName = getTestQueueName();
+ MessageProducer mandatoryProducer = ((AMQSession<?, ?>) transactedSession).createProducer(
+ transactedSession.createQueue(testQueueName),
+ true, // mandatory
+ false); // immediate
+
+ Message message = transactedSession.createMessage();
+ mandatoryProducer.send(message);
+ transactedSession.commit();
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, testQueueName);
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ private Connection createConnectionWithCloseWhenNoRoute(boolean closeWhenNoRoute) throws Exception
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(closeWhenNoRoute));
+ Connection connection = getConnection(options);
+ connection.setExceptionListener(_testExceptionListener);
+ return connection;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/test/java/org/apache/qpid/systest/producer/ImmediateAndMandatoryPublishingTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/producer/ImmediateAndMandatoryPublishingTest.java b/systests/src/test/java/org/apache/qpid/systest/producer/ImmediateAndMandatoryPublishingTest.java
new file mode 100644
index 0000000..9618feb
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/producer/ImmediateAndMandatoryPublishingTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.producer;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assume.assumeThat;
+
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.producer.noroute.UnroutableMessageTestExceptionListener;
+
+public class ImmediateAndMandatoryPublishingTest extends JmsTestBase
+{
+ private UnroutableMessageTestExceptionListener _testExceptionListener;
+ private Connection _connection;
+
+ @Before
+ public void setUp() throws JMSException
+ {
+ assumeThat("AMQP 0-8..0-9-1 specific behaviour",
+ getProtocol(),
+ is(not(equalTo("0-10"))));
+
+ _testExceptionListener = new UnroutableMessageTestExceptionListener();
+ _connection = getConnection(Collections.singletonMap(ConnectionURL.OPTIONS_CLOSE_WHEN_NO_ROUTE, Boolean.toString(false)));
+ _connection.setExceptionListener(_testExceptionListener);
+ }
+
+ @After
+ public void tearDown() throws JMSException
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+
+ @Test
+ public void testPublishP2PWithNoConsumerAndImmediateOnAndAutoAck() throws Exception
+ {
+ publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ @Test
+ public void testPublishP2PWithNoConsumerAndImmediateOnAndTx() throws Exception
+ {
+ publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.SESSION_TRANSACTED, false);
+ }
+
+ @Test
+ public void testPublishPubSubWithDisconnectedDurableSubscriberAndImmediateOnAndAutoAck() throws Exception
+ {
+ publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+ @Test
+ public void testPublishPubSubWithDisconnectedDurableSubscriberAndImmediateOnAndTx() throws Exception
+ {
+ publishIntoExistingDestinationWithNoConsumerAndImmediateOn(Session.SESSION_TRANSACTED, true);
+ }
+
+ @Test
+ public void testPublishP2PIntoNonExistingDesitinationWithMandatoryOnAutoAck() throws Exception
+ {
+ publishWithMandatoryOnImmediateOff(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ @Test
+ public void testPublishP2PIntoNonExistingDesitinationWithMandatoryOnAndTx() throws Exception
+ {
+ publishWithMandatoryOnImmediateOff(Session.SESSION_TRANSACTED, false);
+ }
+
+ @Test
+ public void testPubSubMandatoryAutoAck() throws Exception
+ {
+ publishWithMandatoryOnImmediateOff(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+ @Test
+ public void testPubSubMandatoryTx() throws Exception
+ {
+ publishWithMandatoryOnImmediateOff(Session.SESSION_TRANSACTED, true);
+ }
+
+ @Test
+ public void testP2PNoMandatoryAutoAck() throws Exception
+ {
+ publishWithMandatoryOffImmediateOff(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ @Test
+ public void testP2PNoMandatoryTx() throws Exception
+ {
+ publishWithMandatoryOffImmediateOff(Session.SESSION_TRANSACTED, false);
+ }
+
+ @Test
+ public void testPubSubWithImmediateOnAndAutoAck() throws Exception
+ {
+ consumerCreateAndClose(true, false);
+
+ Message message = produceMessage(Session.AUTO_ACKNOWLEDGE, true, false, true);
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+ }
+
+ private void publishIntoExistingDestinationWithNoConsumerAndImmediateOn(int acknowledgeMode, boolean pubSub)
+ throws Exception
+ {
+ consumerCreateAndClose(pubSub, true);
+
+ Message message = produceMessage(acknowledgeMode, pubSub, false, true);
+
+ _testExceptionListener.assertReceivedNoConsumersWithReturnedMessage(message);
+ }
+
+ private void publishWithMandatoryOnImmediateOff(int acknowledgeMode, boolean pubSub) throws Exception
+ {
+ Message message = produceMessage(acknowledgeMode, pubSub, true, false);
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+ }
+
+ private void publishWithMandatoryOffImmediateOff(int acknowledgeMode, boolean pubSub) throws Exception
+ {
+ produceMessage(acknowledgeMode, pubSub, false, false);
+
+ _testExceptionListener.assertNoException();
+ }
+
+ private void consumerCreateAndClose(boolean pubSub, boolean durable) throws JMSException
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination;
+ MessageConsumer consumer;
+ if (pubSub)
+ {
+ destination = session.createTopic(getTestQueueName());
+ if (durable)
+ {
+ consumer = session.createDurableSubscriber((Topic) destination, getTestName());
+ }
+ else
+ {
+ consumer = session.createConsumer(destination);
+ }
+ }
+ else
+ {
+ destination = session.createQueue(getTestQueueName());
+ consumer = session.createConsumer(destination);
+ }
+ consumer.close();
+ }
+
+ private Message produceMessage(int acknowledgeMode, boolean pubSub, boolean mandatory, boolean immediate)
+ throws JMSException
+ {
+ Session session = _connection.createSession(acknowledgeMode == Session.SESSION_TRANSACTED, acknowledgeMode);
+ Destination destination;
+ if (pubSub)
+ {
+ destination = session.createTopic(getTestQueueName());
+ }
+ else
+ {
+ destination = session.createQueue(getTestQueueName());
+ }
+
+ MessageProducer producer = ((AMQSession<?, ?>) session).createProducer(destination, mandatory, immediate);
+ Message message = session.createMessage();
+ producer.send(message);
+ if (session.getTransacted())
+ {
+ session.commit();
+ }
+ return message;
+ }
+
+ public void testMandatoryAndImmediateDefaults() throws Exception
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // publish to non-existent queue - should get mandatory failure
+ MessageProducer producer = session.createProducer(session.createQueue(getTestQueueName()));
+ Message message = session.createMessage();
+ producer.send(message);
+
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+
+ producer = session.createProducer(null);
+ message = session.createMessage();
+ producer.send(session.createQueue(getTestQueueName()), message);
+
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+
+ // publish to non-existent topic - should get no failure
+ producer = session.createProducer(session.createTopic(getTestQueueName()));
+ message = session.createMessage();
+ producer.send(message);
+
+ _testExceptionListener.assertNoException();
+
+ producer = session.createProducer(null);
+ message = session.createMessage();
+ producer.send(session.createTopic(getTestQueueName()), message);
+
+ _testExceptionListener.assertNoException();
+
+ session.close();
+ }
+
+ public void testMandatoryAndImmediateSystemProperties() throws Exception
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ System.setProperty("qpid.default_mandatory", "true");
+ try
+ {
+
+ // publish to non-existent topic - should get mandatory failure
+
+ MessageProducer producer = session.createProducer(session.createTopic(getTestQueueName()));
+ Message message = session.createMessage();
+ producer.send(message);
+
+ _testExceptionListener.assertReceivedNoRouteWithReturnedMessage(message, getTestQueueName());
+ }
+ finally
+ {
+ System.clearProperty("qpid.default_mandatory");
+ }
+
+ // now set topic specific system property to false - should no longer get mandatory failure on new producer
+ System.setProperty("qpid.default_mandatory_topic", "false");
+ try
+ {
+ MessageProducer producer = session.createProducer(null);
+ Message message = session.createMessage();
+ producer.send(session.createTopic(getTestQueueName()), message);
+
+ _testExceptionListener.assertNoException();
+ }
+ finally
+ {
+ System.clearProperty("qpid.default_mandatory");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/test/java/org/apache/qpid/systest/producer/ReturnUnroutableMandatoryMessageTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/producer/ReturnUnroutableMandatoryMessageTest.java b/systests/src/test/java/org/apache/qpid/systest/producer/ReturnUnroutableMandatoryMessageTest.java
new file mode 100644
index 0000000..179d96b
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/producer/ReturnUnroutableMandatoryMessageTest.java
@@ -0,0 +1,300 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.producer;
+
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQHeadersExchange;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+
+public class ReturnUnroutableMandatoryMessageTest extends JmsTestBase implements ExceptionListener
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReturnUnroutableMandatoryMessageTest.class);
+
+ private Message _bouncedMessage;
+ private CountDownLatch _bounceMessageWaiter;
+
+ @Before
+ public void setUp() throws JMSException
+ {
+ assumeThat("AMQP 0-8..0-9-1 specific behaviour",
+ getProtocol(),
+ is(not(equalTo("0-10"))));
+ _bounceMessageWaiter = new CountDownLatch(1);
+ }
+
+ @Test
+ public void testReturnUnroutableMandatoryMessage_HEADERS() throws Exception
+ {
+ MessageConsumer consumer;
+ AMQSession producerSession;
+ AMQHeadersExchange queue;
+ Connection con = getConnection();
+ try
+ {
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS
+ + "://"
+ + ExchangeDefaults.HEADERS_EXCHANGE_NAME
+ + "/test/queue1?"
+ + BindingURL.OPTION_ROUTING_KEY
+ + "='F0000=1'"));
+
+ Map<String, Object> ft = new HashMap<>();
+ ft.put("F1000", "1");
+ consumerSession.declareAndBind(queue, ft);
+
+ consumer = consumerSession.createConsumer(queue);
+
+ Connection con2 = getConnection();
+ try
+ {
+ con2.setExceptionListener(this);
+ producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ LOGGER.info("Starting producer connection");
+ con2.start();
+ try
+ {
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(queue, false, false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+
+ // First test - should neither be bounced nor routed
+ LOGGER.info("Sending non-routable non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ LOGGER.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+ // Third test - should be routed
+ LOGGER.info("Sending routable message");
+ TextMessage msg3 = producerSession.createTextMessage("msg3");
+ msg3.setStringProperty("F1000", "1");
+ mandatoryProducer.send(msg3);
+
+ LOGGER.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(getReceiveTimeout());
+
+ assertTrue("No message routed to receiver", tm != null);
+ assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg3".equals(tm.getText()));
+ assertTrue("Message is not bounced",
+ _bounceMessageWaiter.await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
+ assertNotNull(_bouncedMessage);
+ Message m = _bouncedMessage;
+ assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+ }
+ catch (JMSException jmse)
+ {
+ // pass
+ }
+ }
+ finally
+ {
+ con2.close();
+ }
+ }
+ finally
+ {
+ con.close();
+ }
+ }
+
+ @Test
+ public void testReturnUnroutableMandatoryMessage_QUEUE() throws Exception
+ {
+ Connection con = getConnection();
+ try
+ {
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ AMQQueue valid_queue =
+ new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "testReturnUnroutableMandatoryMessage_QUEUE");
+ AMQQueue invalid_queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
+ "testReturnUnroutableMandatoryMessage_QUEUE_INVALID");
+ MessageConsumer consumer = consumerSession.createConsumer(valid_queue);
+
+ Connection con2 = getConnection();
+ try
+ {
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ LOGGER.info("Starting producer connection");
+ con2.start();
+
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_queue, false, false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(invalid_queue);
+
+ // First test - should be routed
+ LOGGER.info("Sending non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ LOGGER.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+ LOGGER.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(getReceiveTimeout());
+
+ assertTrue("No message routed to receiver", tm != null);
+ assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+
+ assertTrue("Message is not bounced",
+ _bounceMessageWaiter.await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
+ assertNotNull(_bouncedMessage);
+ Message m = _bouncedMessage;
+ assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+ }
+ finally
+ {
+ con2.close();
+ }
+ }
+ finally
+ {
+ con.close();
+ }
+ }
+
+ @Test
+ public void testReturnUnroutableMandatoryMessage_TOPIC() throws Exception
+ {
+ Connection con = getConnection();
+ try
+ {
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ AMQTopic valid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS,
+ "test.Return.Unroutable.Mandatory.Message.TOPIC");
+ AMQTopic invalid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS,
+ "test.Return.Unroutable.Mandatory.Message.TOPIC.invalid");
+ MessageConsumer consumer = consumerSession.createConsumer(valid_topic);
+
+ Connection con2 = getConnection();
+ try
+ {
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ LOGGER.info("Starting producer connection");
+ con2.start();
+
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_topic, false, false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(invalid_topic, false, true);
+
+ // First test - should be routed
+ LOGGER.info("Sending non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ LOGGER.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+ LOGGER.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(getReceiveTimeout());
+
+ assertTrue("No message routed to receiver", tm != null);
+ assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+
+ assertTrue("Message is not bounced",
+ _bounceMessageWaiter.await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
+ assertNotNull(_bouncedMessage);
+ Message m = _bouncedMessage;
+ assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+ }
+ finally
+ {
+ con2.close();
+ }
+ }
+ finally
+ {
+ con.close();
+ }
+ }
+
+ @Override
+ public void onException(JMSException jmsException)
+ {
+ Exception linkedException;
+ linkedException = jmsException.getLinkedException();
+ if (linkedException instanceof AMQNoRouteException)
+ {
+ LOGGER.info("Caught expected NoRouteException");
+ AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
+ _bouncedMessage = (Message) noRoute.getUndeliveredMessage();
+ _bounceMessageWaiter.countDown();
+ }
+ else
+ {
+ LOGGER.warn("Caught exception on producer: ", jmsException);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ae33f652/systests/src/test/java/org/apache/qpid/systest/producer/noroute/UnroutableMessageTestExceptionListener.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/producer/noroute/UnroutableMessageTestExceptionListener.java b/systests/src/test/java/org/apache/qpid/systest/producer/noroute/UnroutableMessageTestExceptionListener.java
new file mode 100644
index 0000000..476a31a
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/producer/noroute/UnroutableMessageTestExceptionListener.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.producer.noroute;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+
+/**
+ * Provides utility methods for checking exceptions that are thrown on the client side when a message is
+ * not routable.
+ *
+ * Exception objects are passed either explicitly as method parameters or implicitly
+ * by previously doing {@link Connection#setExceptionListener(ExceptionListener)}.
+ */
+public class UnroutableMessageTestExceptionListener implements ExceptionListener
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(UnroutableMessageTestExceptionListener.class);
+
+ /**
+ * Number of seconds to check for an event that should should NOT happen
+ */
+ private static final int NEGATIVE_TIMEOUT = 2;
+
+ /**
+ * Number of seconds to keep checking for an event that should should happen
+ */
+ private static final int POSITIVE_TIMEOUT = 30;
+
+ private BlockingQueue<JMSException> _exceptions = new ArrayBlockingQueue<JMSException>(1);
+
+ @Override
+ public void onException(JMSException e)
+ {
+ LOGGER.info("Received exception " + e);
+ _exceptions.add(e);
+ }
+
+ public void assertReceivedNoRouteWithReturnedMessage(Message message, String intendedQueueName) throws Exception
+ {
+ JMSException exception = getReceivedException();
+ assertNoRouteExceptionWithReturnedMessage(exception, message, intendedQueueName);
+ }
+
+ public void assertReceivedNoRoute(String intendedQueueName) throws Exception
+ {
+ JMSException exception = getReceivedException();
+ assertNoRoute(exception, intendedQueueName);
+ }
+
+ public void assertReceivedNoConsumersWithReturnedMessage(Message message) throws Exception
+ {
+ JMSException exception = getReceivedException();
+ AMQNoConsumersException noConsumersException = (AMQNoConsumersException) exception.getLinkedException();
+ assertNotNull("AMQNoConsumersException should be linked to JMSException", noConsumersException);
+ Message bounceMessage = (Message) noConsumersException.getUndeliveredMessage();
+ assertNotNull("Bounced Message is expected", bounceMessage);
+
+ assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ }
+
+ public void assertNoRouteExceptionWithReturnedMessage(
+ JMSException exception, Message message, String intendedQueueName) throws Exception
+ {
+ assertNoRoute(exception, intendedQueueName);
+
+ assertNoRouteException(exception, message);
+ }
+
+ private void assertNoRouteException(JMSException exception, Message message) throws Exception
+ {
+ AMQNoRouteException noRouteException = (AMQNoRouteException) exception.getLinkedException();
+ assertNotNull("AMQNoRouteException should be linked to JMSException", noRouteException);
+ Message bounceMessage = (Message) noRouteException.getUndeliveredMessage();
+ assertNotNull("Bounced Message is expected", bounceMessage);
+
+ assertEquals("Unexpected message is bounced", message.getJMSMessageID(), bounceMessage.getJMSMessageID());
+ }
+
+ public void assertNoRoute(JMSException exception, String intendedQueueName)
+ {
+ assertTrue(
+ exception + " message should contain intended queue name",
+ exception.getMessage().contains(intendedQueueName));
+
+ AMQException noRouteException = (AMQException) exception.getLinkedException();
+ assertNotNull("AMQException should be linked to JMSException", noRouteException);
+
+ assertAMQException("Unexpected error code", 312, noRouteException);
+ assertTrue(
+ "Linked exception " + noRouteException + " message should contain intended queue name",
+ noRouteException.getMessage().contains(intendedQueueName));
+ }
+
+
+ public void assertNoException() throws Exception
+ {
+ assertNull("Unexpected JMSException", _exceptions.poll(NEGATIVE_TIMEOUT, TimeUnit.SECONDS));
+ }
+
+ private JMSException getReceivedException() throws Exception
+ {
+ JMSException exception = _exceptions.poll(POSITIVE_TIMEOUT, TimeUnit.SECONDS);
+ assertNotNull("JMSException is expected", exception);
+ return exception;
+ }
+
+ private void assertAMQException(final String message, final int expected, final AMQException e)
+ {
+ Object object = e.getErrorCode(); // API change after v6.1
+ if (object instanceof Integer)
+ {
+ assertEquals(message, expected, e.getErrorCode());
+ }
+ else
+ {
+ final String fullMessage = String.format("%s. expected actual : %s to start with %d", message, e.getErrorCode(), expected);
+ final String actual = String.valueOf(e.getErrorCode());
+ assertTrue(fullMessage, actual.startsWith(Integer.toString(expected)));
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org