You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/04 23:43:10 UTC
[26/58] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
deleted file mode 100644
index d05a5c7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageProducer;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Unit test for virtual topics and DLQ messaging. See individual test for more
- * detail
- */
-public class VirtualTopicDLQTest extends TestCase {
-
- private static BrokerService broker;
-
- private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDLQTest.class);
-
- static final String jmsConnectionURI = "failover:(vm://localhost)";
-
- // Virtual Topic that the test publishes 10 messages to
- private static final String virtualTopicName = "VirtualTopic.Test";
-
- // Queues that receive all the messages send to the virtual topic
- private static final String consumer1Prefix = "Consumer.A.";
- private static final String consumer2Prefix = "Consumer.B.";
- private static final String consumer3Prefix = "Consumer.C.";
-
- // Expected Individual Dead Letter Queue names that are tied to the
- // Subscriber Queues
- private static final String dlqPrefix = "ActiveMQ.DLQ.Topic.";
-
- // Number of messages
- private static final int numberMessages = 6;
-
- @Override
- @Before
- public void setUp() throws Exception {
- try {
- broker = BrokerFactory.createBroker("xbean:org/apache/activemq/broker/virtual/virtual-individual-dlq.xml", true);
- broker.start();
- broker.waitUntilStarted();
- }
- catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- try {
- // Purge the DLQ's so counts are correct for next run
- purgeDestination(dlqPrefix + consumer1Prefix + virtualTopicName);
- purgeDestination(dlqPrefix + consumer2Prefix + virtualTopicName);
- purgeDestination(dlqPrefix + consumer3Prefix + virtualTopicName);
- }
- catch (Exception e) {
- e.printStackTrace();
- }
-
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- broker = null;
- }
- }
-
- /*
- * This test verifies that all undelivered messages sent to a consumers
- * listening on a queue associated with a virtual topic with be forwarded to
- * separate DLQ's.
- *
- * Note that the broker config, deadLetterStrategy need to have the enable
- * audit set to false so that duplicate message sent from a topic to
- * individual consumers are forwarded to the DLQ
- *
- * <deadLetterStrategy> <bean
- * xmlns="http://www.springframework.org/schema/beans"
- * class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy"
- * > <property name="useQueueForQueueMessages" value="true"></property>
- * <property name="processNonPersistent" value="true"></property> <property
- * name="processExpired" value="false"></property> <property
- * name="enableAudit" value="false"></property>
- *
- * </bean> </deadLetterStrategy>
- */
- @Test
- public void testVirtualTopicSubscriberDeadLetterQueue() throws Exception {
-
- TestConsumer consumer1 = null;
- TestConsumer consumer2 = null;
- TestConsumer consumer3 = null;
- TestConsumer dlqConsumer1 = null;
- TestConsumer dlqConsumer2 = null;
- TestConsumer dlqConsumer3 = null;
-
- try {
-
- // The first 2 consumers will rollback, ultimately causing messages
- // to land on the DLQ
- consumer1 = new TestConsumer(consumer1Prefix + virtualTopicName, false, numberMessages, true);
- thread(consumer1, false);
-
- consumer2 = new TestConsumer(consumer2Prefix + virtualTopicName, false, numberMessages, true);
- thread(consumer2, false);
-
- // TestConsumer that does not throw exceptions, messages should not
- // land on DLQ
- consumer3 = new TestConsumer(consumer3Prefix + virtualTopicName, false, numberMessages, false);
- thread(consumer3, false);
-
- // TestConsumer to read the expected Dead Letter Queue
- dlqConsumer1 = new TestConsumer(dlqPrefix + consumer1Prefix + virtualTopicName, false, numberMessages, false);
- thread(dlqConsumer1, false);
-
- dlqConsumer2 = new TestConsumer(dlqPrefix + consumer2Prefix + virtualTopicName, false, numberMessages, false);
- thread(dlqConsumer2, false);
-
- dlqConsumer3 = new TestConsumer(dlqPrefix + consumer3Prefix + virtualTopicName, false, numberMessages, false);
- thread(dlqConsumer3, false);
-
- // Give the consumers a second to start
- Thread.sleep(1000);
-
- // Start the producer
- TestProducer producer = new TestProducer(virtualTopicName, true, numberMessages);
- thread(producer, false);
-
- assertTrue("sent all producer messages in time, count is: " + producer.getLatch().getCount(), producer.getLatch().await(10, TimeUnit.SECONDS));
- LOG.info("producer successful, count = " + producer.getLatch().getCount());
-
- assertTrue("remaining consumer1 count should be zero, is: " + consumer1.getLatch().getCount(), consumer1.getLatch().await(10, TimeUnit.SECONDS));
- LOG.info("consumer1 successful, count = " + consumer1.getLatch().getCount());
-
- assertTrue("remaining consumer2 count should be zero, is: " + consumer2.getLatch().getCount(), consumer2.getLatch().await(10, TimeUnit.SECONDS));
- LOG.info("consumer2 successful, count = " + consumer2.getLatch().getCount());
-
- assertTrue("remaining consumer3 count should be zero, is: " + consumer3.getLatch().getCount(), consumer3.getLatch().await(10, TimeUnit.SECONDS));
- LOG.info("consumer3 successful, count = " + consumer3.getLatch().getCount());
-
- assertTrue("remaining dlqConsumer1 count should be zero, is: " + dlqConsumer1.getLatch().getCount(), dlqConsumer1.getLatch().await(10, TimeUnit.SECONDS));
- LOG.info("dlqConsumer1 successful, count = " + dlqConsumer1.getLatch().getCount());
-
- assertTrue("remaining dlqConsumer2 count should be zero, is: " + dlqConsumer2.getLatch().getCount(), dlqConsumer2.getLatch().await(10, TimeUnit.SECONDS));
- LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount());
-
- assertTrue("remaining dlqConsumer3 count should be " + numberMessages + ", is: " + dlqConsumer3.getLatch().getCount(), dlqConsumer3.getLatch().getCount() == numberMessages);
- LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount());
-
- }
- catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- finally {
- // Tell consumers to stop (don't read any more messages after this)
- if (consumer1 != null)
- consumer1.setStop(true);
- if (consumer2 != null)
- consumer2.setStop(true);
- if (consumer3 != null)
- consumer3.setStop(true);
- if (dlqConsumer1 != null)
- dlqConsumer1.setStop(true);
- if (dlqConsumer2 != null)
- dlqConsumer2.setStop(true);
- if (dlqConsumer3 != null)
- dlqConsumer3.setStop(true);
- }
- }
-
- private static Thread thread(Runnable runnable, boolean daemon) {
- Thread brokerThread = new Thread(runnable);
- brokerThread.setDaemon(daemon);
- brokerThread.start();
- return brokerThread;
- }
-
- private class TestProducer implements Runnable {
-
- private String destinationName = null;
- private boolean isTopic = true;
- private int numberMessages = 0;
- private CountDownLatch latch = null;
-
- public TestProducer(String destinationName, boolean isTopic, int numberMessages) {
- this.destinationName = destinationName;
- this.isTopic = isTopic;
- this.numberMessages = numberMessages;
- latch = new CountDownLatch(numberMessages);
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- @Override
- public void run() {
- ActiveMQConnectionFactory connectionFactory = null;
- ActiveMQConnection connection = null;
- ActiveMQSession session = null;
- Destination destination = null;
-
- try {
- LOG.info("Started TestProducer for destination (" + destinationName + ")");
-
- connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
- connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.start();
- session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- if (isTopic) {
- destination = session.createTopic(this.destinationName);
- }
- else {
- destination = session.createQueue(this.destinationName);
- }
-
- // Create a MessageProducer from the Session to the Topic or
- // Queue
- ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < numberMessages; i++) {
- TextMessage message = session.createTextMessage("I am a message :: " + String.valueOf(i));
- try {
- producer.send(message);
-
- }
- catch (Exception deeperException) {
- LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException);
- }
-
- latch.countDown();
- Thread.sleep(1000);
- }
-
- LOG.info("Finished TestProducer for destination (" + destinationName + ")");
-
- }
- catch (Exception e) {
- LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e);
- e.printStackTrace();
-
- }
- finally {
- try {
- // Clean up
- if (session != null)
- session.close();
- if (connection != null)
- connection.close();
-
- }
- catch (Exception e) {
- e.printStackTrace();
- LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
- }
- }
- }
- }
-
- private class TestConsumer implements Runnable, ExceptionListener, MessageListener {
-
- private String destinationName = null;
- private boolean isTopic = true;
- private CountDownLatch latch = null;
- private int maxRedeliveries = 0;
- private int receivedMessageCounter = 0;
- private boolean bFakeFail = false;
- private boolean bStop = false;
-
- private ActiveMQConnectionFactory connectionFactory = null;
- private ActiveMQConnection connection = null;
- private Session session = null;
- private MessageConsumer consumer = null;
-
- public TestConsumer(String destinationName, boolean isTopic, int expectedNumberMessages, boolean bFakeFail) {
- this.destinationName = destinationName;
- this.isTopic = isTopic;
- latch = new CountDownLatch(expectedNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1));
- this.bFakeFail = bFakeFail;
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- @Override
- public void run() {
-
- try {
- LOG.info("Started TestConsumer for destination (" + destinationName + ")");
-
- connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
- connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
- RedeliveryPolicy policy = connection.getRedeliveryPolicy();
- policy.setInitialRedeliveryDelay(1);
- policy.setUseExponentialBackOff(false);
- policy.setMaximumRedeliveries(maxRedeliveries);
-
- connection.setExceptionListener(this);
-
- Destination destination = null;
- if (isTopic) {
- destination = session.createTopic(destinationName);
- }
- else {
- destination = session.createQueue(destinationName);
- }
-
- consumer = session.createConsumer(destination);
- consumer.setMessageListener(this);
-
- while (!bStop) {
- Thread.sleep(100);
- }
-
- LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount() + " messages " + this.toString());
-
- }
- catch (Exception e) {
- LOG.error("Consumer (" + destinationName + ") Caught: " + e);
- e.printStackTrace();
- }
- finally {
- try {
- // Clean up
- if (consumer != null)
- consumer.close();
- if (session != null)
- session.close();
- if (connection != null)
- connection.close();
-
- }
- catch (Exception e) {
- e.printStackTrace();
- LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
- }
- }
- }
-
- @Override
- public synchronized void onException(JMSException ex) {
- ex.printStackTrace();
- LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occurred. Shutting down client.");
- }
-
- public synchronized void setStop(boolean bStop) {
- this.bStop = bStop;
- }
-
- @Override
- public synchronized void onMessage(Message message) {
- receivedMessageCounter++;
- latch.countDown();
-
- LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() + " :: Number messages received " + this.receivedMessageCounter);
-
- try {
- LOG.info("Consumer for destination (" + destinationName + ") Received message id :: " + message.getJMSMessageID());
-
- if (!bFakeFail) {
- LOG.info("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString());
- session.commit();
- }
- else {
- LOG.info("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString());
- session.rollback(); // rolls back all the consumed messages
- // on the session to
- }
-
- }
- catch (JMSException ex) {
- ex.printStackTrace();
- LOG.error("Error reading JMS Message from destination " + destinationName + ".");
- }
- }
- }
-
- private static void purgeDestination(String destination) throws Exception {
- final Queue dest = (Queue) ((RegionBroker) broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(new ActiveMQQueue(destination));
- dest.purge();
- assertEquals(0, dest.getDestinationStatistics().getMessages().getCount());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
deleted file mode 100644
index 925b82c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-import org.apache.activemq.xbean.XBeanBrokerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test case for https://issues.apache.org/jira/browse/AMQ-3004
- */
-
-public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class);
- protected Connection connection;
-
- public void testVirtualTopicSelectorDisconnect() throws Exception {
- testVirtualTopicDisconnect("odd = 'no'", 3000, 1500);
- }
-
- public void testVirtualTopicNoSelectorDisconnect() throws Exception {
- testVirtualTopicDisconnect(null, 3000, 3000);
- }
-
- public void testVirtualTopicDisconnect(String messageSelector, int total, int expected) throws Exception {
- if (connection == null) {
- connection = createConnection();
- }
- connection.start();
-
- final ConsumerBean messageList = new ConsumerBean();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- Destination producerDestination = getProducerDestination();
- Destination destination = getConsumerDsetination();
-
- LOG.info("Sending to: " + producerDestination);
- LOG.info("Consuming from: " + destination);
-
- MessageConsumer consumer = createConsumer(session, destination, messageSelector);
-
- MessageListener listener = new MessageListener() {
- @Override
- public void onMessage(Message message) {
- messageList.onMessage(message);
- try {
- message.acknowledge();
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
- };
-
- consumer.setMessageListener(listener);
-
- // create topic producer
- MessageProducer producer = session.createProducer(producerDestination);
- assertNotNull(producer);
-
- int disconnectCount = total / 3;
- int reconnectCount = (total * 2) / 3;
-
- for (int i = 0; i < total; i++) {
- producer.send(createMessage(session, i));
-
- if (i == disconnectCount) {
- consumer.close();
- }
- if (i == reconnectCount) {
- consumer = createConsumer(session, destination, messageSelector);
- consumer.setMessageListener(listener);
- }
- }
-
- assertMessagesArrived(messageList, expected, 10000);
- }
-
- protected Destination getConsumerDsetination() {
- return new ActiveMQQueue("Consumer.VirtualTopic.TEST");
- }
-
- protected Destination getProducerDestination() {
- return new ActiveMQTopic("VirtualTopic.TEST");
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- }
-
- protected MessageConsumer createConsumer(Session session,
- Destination destination,
- String messageSelector) throws JMSException {
- if (messageSelector != null) {
- return session.createConsumer(destination, messageSelector);
- }
- else {
- return session.createConsumer(destination);
- }
- }
-
- protected TextMessage createMessage(Session session, int i) throws JMSException {
- TextMessage textMessage = session.createTextMessage("message: " + i);
- if (i % 2 != 0) {
- textMessage.setStringProperty("odd", "yes");
- }
- else {
- textMessage.setStringProperty("odd", "no");
- }
- textMessage.setIntProperty("i", i);
- return textMessage;
- }
-
- protected void assertMessagesArrived(ConsumerBean messageList, int expected, long timeout) {
- messageList.assertMessagesArrived(expected, timeout);
-
- messageList.flushMessages();
-
- LOG.info("validate no other messages on queues");
- try {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Destination destination1 = getConsumerDsetination();
-
- MessageConsumer c1 = session.createConsumer(destination1, null);
- c1.setMessageListener(messageList);
-
- LOG.info("send one simple message that should go to both consumers");
- MessageProducer producer = session.createProducer(getProducerDestination());
- assertNotNull(producer);
-
- producer.send(session.createTextMessage("Last Message"));
-
- messageList.assertMessagesArrived(1);
-
- }
- catch (JMSException e) {
- e.printStackTrace();
- fail("unexpeced ex while waiting for last messages: " + e);
- }
- }
-
- protected String getBrokerConfigUri() {
- return "org/apache/activemq/broker/virtual/disconnected-selector.xml";
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- XBeanBrokerFactory factory = new XBeanBrokerFactory();
- BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
- return answer;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java
deleted file mode 100644
index 0f2af0a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.util.Vector;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-
-/**
- *
- *
- */
-public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
-
- private Vector<Connection> connections = new Vector<>();
- public int ackMode = Session.AUTO_ACKNOWLEDGE;
-
- public static Test suite() {
- return suite(VirtualTopicPubSubTest.class);
- }
-
- public void initCombosForTestVirtualTopicCreation() {
- addCombinationValues("ackMode", new Object[]{new Integer(Session.AUTO_ACKNOWLEDGE), new Integer(Session.CLIENT_ACKNOWLEDGE)});
- }
-
- private boolean doneTwice = false;
-
- public void testVirtualTopicCreation() throws Exception {
- doTestVirtualTopicCreation(10);
- }
-
- public void doTestVirtualTopicCreation(int total) throws Exception {
-
- ConsumerBean messageList = new ConsumerBean() {
- @Override
- public synchronized void onMessage(Message message) {
- super.onMessage(message);
- if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
- try {
- message.acknowledge();
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
-
- }
- };
- messageList.setVerbose(true);
-
- String queueAName = getVirtualTopicConsumerName();
- // create consumer 'cluster'
- ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
- ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
-
- Session session = createStartAndTrackConnection().createSession(false, ackMode);
- MessageConsumer c1 = session.createConsumer(queue1);
-
- session = createStartAndTrackConnection().createSession(false, ackMode);
- MessageConsumer c2 = session.createConsumer(queue2);
-
- c1.setMessageListener(messageList);
- c2.setMessageListener(messageList);
-
- // create topic producer
- Session producerSession = createStartAndTrackConnection().createSession(false, ackMode);
- MessageProducer producer = producerSession.createProducer(new ActiveMQTopic(getVirtualTopicName()));
- assertNotNull(producer);
-
- for (int i = 0; i < total; i++) {
- producer.send(producerSession.createTextMessage("message: " + i));
- }
-
- messageList.assertMessagesArrived(total);
-
- // do twice so we confirm messages do not get redelivered after client acknowledgement
- if (doneTwice == false) {
- doneTwice = true;
- doTestVirtualTopicCreation(0);
- }
- }
-
- private Connection createStartAndTrackConnection() throws Exception {
- Connection connection = createConnection();
- connection.start();
- connections.add(connection);
- return connection;
- }
-
- protected String getVirtualTopicName() {
- return "VirtualTopic.TEST";
- }
-
- protected String getVirtualTopicConsumerName() {
- return "Consumer.A.VirtualTopic.TEST";
- }
-
- @Override
- protected void tearDown() throws Exception {
- for (Connection connection : connections) {
- connection.close();
- }
- super.tearDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java
deleted file mode 100644
index 1d7ea71..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import java.net.URI;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.xbean.XBeanBrokerFactory;
-
-/**
- *
- *
- */
-public class VirtualTopicPubSubUsingXBeanTest extends VirtualTopicPubSubTest {
-
- @Override
- protected String getVirtualTopicConsumerName() {
- return "VirtualTopicConsumers.ConsumerNumberOne.FOO";
- }
-
- @Override
- protected String getVirtualTopicName() {
- return "FOO";
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- XBeanBrokerFactory factory = new XBeanBrokerFactory();
- BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
-
- // lets disable persistence as we are a test
- answer.setPersistent(false);
-
- return answer;
- }
-
- protected String getBrokerConfigUri() {
- return "org/apache/activemq/broker/virtual/global-virtual-topics.xml";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
deleted file mode 100644
index d94dd18..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualTopic;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VirtualTopicSelectorTest extends CompositeTopicTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicSelectorTest.class);
-
- @Override
- protected Destination getConsumer1Dsetination() {
- return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST");
- }
-
- @Override
- protected Destination getConsumer2Dsetination() {
- return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST");
- }
-
- @Override
- protected Destination getProducerDestination() {
- return new ActiveMQTopic("VirtualTopic.TEST");
- }
-
- @Override
- protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
- messageList1.assertMessagesArrived(total / 2);
- messageList2.assertMessagesArrived(total / 2);
-
- messageList1.flushMessages();
- messageList2.flushMessages();
-
- LOG.info("validate no other messages on queues");
- try {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Destination destination1 = getConsumer1Dsetination();
- Destination destination2 = getConsumer2Dsetination();
- MessageConsumer c1 = session.createConsumer(destination1, null);
- MessageConsumer c2 = session.createConsumer(destination2, null);
- c1.setMessageListener(messageList1);
- c2.setMessageListener(messageList2);
-
- LOG.info("send one simple message that should go to both consumers");
- MessageProducer producer = session.createProducer(getProducerDestination());
- assertNotNull(producer);
-
- producer.send(session.createTextMessage("Last Message"));
-
- messageList1.assertMessagesArrived(1);
- messageList2.assertMessagesArrived(1);
-
- }
- catch (JMSException e) {
- e.printStackTrace();
- fail("unexpeced ex while waiting for last messages: " + e);
- }
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- // use message selectors on consumers that need to propagate up to the virtual
- // topic dispatch so that un matched messages do not linger on subscription queues
- messageSelector1 = "odd = 'yes'";
- messageSelector2 = "odd = 'no'";
-
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
-
- VirtualTopic virtualTopic = new VirtualTopic();
- // the new config that enables selectors on the intercepter
- virtualTopic.setSelectorAware(true);
- VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
- interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
- broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
- return broker;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java
deleted file mode 100644
index 4abf811..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.virtual;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.broker.jmx.MBeanTest;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-
-public class VirtualTopicsAndDurableSubsTest extends MBeanTest {
-
- private Connection connection;
-
- public void testVirtualTopicCreationAndDurableSubs() throws Exception {
- if (connection == null) {
- connection = createConnection();
- }
- connection.setClientID(getAClientID());
- connection.start();
-
- ConsumerBean messageList = new ConsumerBean();
- messageList.setVerbose(true);
-
- String queueAName = getVirtualTopicConsumerName();
- // create consumer 'cluster'
- ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
- ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c1 = session.createConsumer(queue1);
- MessageConsumer c2 = session.createConsumer(queue2);
-
- c1.setMessageListener(messageList);
- c2.setMessageListener(messageList);
-
- // create topic producer
- MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName()));
- assertNotNull(producer);
-
- int total = 10;
- for (int i = 0; i < total; i++) {
- producer.send(session.createTextMessage("message: " + i));
- }
- messageList.assertMessagesArrived(total);
-
- //Add and remove durable subscriber after using VirtualTopics
- assertCreateAndDestroyDurableSubscriptions();
- }
-
- protected String getAClientID() {
- return "VirtualTopicCreationAndDurableSubs";
- }
-
- protected String getVirtualTopicName() {
- return "VirtualTopic.TEST";
- }
-
- protected String getVirtualTopicConsumerName() {
- return "Consumer.A.VirtualTopic.TEST";
- }
-
- protected String getDurableSubscriberName() {
- return "Sub1";
- }
-
- protected String getDurableSubscriberTopicName() {
- return "simple.topic";
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- }
- super.tearDown();
- }
-
- //Overrides test cases from MBeanTest to avoid having them run.
- @Override
- public void testMBeans() throws Exception {
- }
-
- @Override
- public void testMoveMessages() throws Exception {
- }
-
- @Override
- public void testRetryMessages() throws Exception {
- }
-
- @Override
- public void testMoveMessagesBySelector() throws Exception {
- }
-
- @Override
- public void testCopyMessagesBySelector() throws Exception {
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml
deleted file mode 100644
index ed3bc73..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
- <broker persistent="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core">
- <destinationInterceptors>
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <compositeQueue name="MY.QUEUE">
- <forwardTo>
- <queue physicalName="FOO" />
- <topic physicalName="BAR" />
- </forwardTo>
- </compositeQueue>
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
-
- </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml
deleted file mode 100644
index ded6471..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
- <broker xmlns="http://activemq.apache.org/schema/core">
- <destinationInterceptors>
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <compositeTopic name="MY.TOPIC">
- <forwardTo>
- <queue physicalName="FOO" />
- <topic physicalName="BAR" />
- </forwardTo>
- </compositeTopic>
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
-
- </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml
deleted file mode 100644
index 2772910..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
- <broker xmlns="http://activemq.apache.org/schema/core" persistent="false">
- <destinationInterceptors>
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <virtualTopic name="VirtualTopic.>" prefix="Consumer." selectorAware="true"/>
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
- <plugins>
- <virtualSelectorCacheBrokerPlugin persistFile = "target/selectorcache.data"/>
- </plugins>
- </broker>
-</beans>
-<!-- END SNIPPET: xbean -->
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml
deleted file mode 100644
index d51f03c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
- <broker xmlns="http://activemq.apache.org/schema/core">
- <destinationInterceptors>
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <compositeQueue name="MY.QUEUE">
- <forwardTo>
- <filteredDestination selector="odd = 'yes'" queue="FOO"/>
- <filteredDestination selector="i = 5" topic="BAR"/>
- </forwardTo>
- </compositeQueue>
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
-
- </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml
deleted file mode 100644
index ddd0667..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
- <broker xmlns="http://activemq.apache.org/schema/core">
- <destinationInterceptors>
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
-
- </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml
deleted file mode 100644
index d725436..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml
+++ /dev/null
@@ -1,80 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<!-- START SNIPPET: example -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-
-
- <!--
- The <broker> element is used to configure the ActiveMQ broker.
- -->
- <broker xmlns="http://activemq.apache.org/schema/core" brokerName="bcBroker">
-
- <destinationInterceptors>
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." />
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
-
- <destinationPolicy>
- <policyMap>
- <policyEntries>
- <policyEntry queue=">" memoryLimit="128 mb" >
- <deadLetterStrategy>
- <bean xmlns="http://www.springframework.org/schema/beans"
- class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy">
- <property name="useQueueForQueueMessages" value="true"></property>
- <property name="processNonPersistent" value="true"></property>
- <property name="processExpired" value="false"></property>
- <property name="enableAudit" value="false"></property>
-
- </bean>
- </deadLetterStrategy>
- </policyEntry>
- <policyEntry topic=">" memoryLimit="128 mb" >
- <deadLetterStrategy>
- <bean xmlns="http://www.springframework.org/schema/beans"
- class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy">
- <property name="useQueueForQueueMessages" value="true"></property>
- <property name="processNonPersistent" value="true"></property>
- <property name="processExpired" value="false"></property>
- <property name="enableAudit" value="false"></property>
-
- </bean>
- </deadLetterStrategy>
- </policyEntry>
- </policyEntries>
- </policyMap>
- </destinationPolicy>
-
- <managementContext>
- <managementContext createConnector="false"/>
- </managementContext>
-
- </broker>
-
-
-
-</beans>
-<!-- END SNIPPET: example -->
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml
deleted file mode 100644
index fcce72e..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!-- this file can only be parsed using the xbean-spring library -->
-<!-- START SNIPPET: xbean -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
-
- <broker xmlns="http://activemq.apache.org/schema/core" persistent="false">
-
-
- <destinationInterceptors>
- <!-- custom destination interceptor -->
- <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.broker.virtual.DestinationInterceptorDurableSubTest$SimpleDestinationInterceptor" />
-
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
-
- <managementContext>
- <managementContext createConnector="true" connectorPort="1299"/>
- </managementContext>
- </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java
deleted file mode 100644
index 0568757..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-
-/**
- * An AMQ-1282 Test
- */
-public class AMQ1282 extends TestCase {
-
- private ConnectionFactory factory;
- private Connection connection;
- private MapMessage message;
-
- @Override
- protected void setUp() throws Exception {
- factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
- connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- message = session.createMapMessage();
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- connection.close();
- super.tearDown();
- }
-
- public void testUnmappedBooleanMessage() throws JMSException {
- Object expected;
- try {
- expected = Boolean.valueOf(null);
- }
- catch (Exception ex) {
- expected = ex;
- }
- try {
- Boolean actual = message.getBoolean("foo");
- assertEquals(expected, actual);
- }
- catch (Exception ex) {
- assertEquals(expected, ex);
- }
- }
-
- public void testUnmappedIntegerMessage() throws JMSException {
- Object expected;
- try {
- expected = Integer.valueOf(null);
- }
- catch (Exception ex) {
- expected = ex;
- }
- try {
- Integer actual = message.getInt("foo");
- assertEquals(expected, actual);
- }
- catch (Exception ex) {
- Class<?> aClass = expected.getClass();
- assertTrue(aClass.isInstance(ex));
- }
- }
-
- public void testUnmappedShortMessage() throws JMSException {
- Object expected;
- try {
- expected = Short.valueOf(null);
- }
- catch (Exception ex) {
- expected = ex;
- }
- try {
- Short actual = message.getShort("foo");
- assertEquals(expected, actual);
- }
- catch (Exception ex) {
- Class<?> aClass = expected.getClass();
- assertTrue(aClass.isInstance(ex));
- }
- }
-
- public void testUnmappedLongMessage() throws JMSException {
- Object expected;
- try {
- expected = Long.valueOf(null);
- }
- catch (Exception ex) {
- expected = ex;
- }
- try {
- Long actual = message.getLong("foo");
- assertEquals(expected, actual);
- }
- catch (Exception ex) {
- Class<?> aClass = expected.getClass();
- assertTrue(aClass.isInstance(ex));
- }
- }
-
- public void testUnmappedStringMessage() throws JMSException {
- Object expected;
- try {
- expected = String.valueOf(null);
- }
- catch (Exception ex) {
- expected = ex;
- }
- try {
- String actual = message.getString("foo");
- assertEquals(expected, actual);
- }
- catch (Exception ex) {
- Class<?> aClass = expected.getClass();
- assertTrue(aClass.isInstance(ex));
- }
- }
-
- public void testUnmappedCharMessage() throws JMSException {
- try {
- message.getChar("foo");
- fail("should have thrown NullPointerException");
- }
- catch (NullPointerException success) {
- assertNotNull(success);
- }
- }
-
- public void testUnmappedByteMessage() throws JMSException {
- Object expected;
- try {
- expected = Byte.valueOf(null);
- }
- catch (Exception ex) {
- expected = ex;
- }
- try {
- Byte actual = message.getByte("foo");
- assertEquals(expected, actual);
- }
- catch (Exception ex) {
- Class<?> aClass = expected.getClass();
- assertTrue(aClass.isInstance(ex));
- }
- }
-
- public void testUnmappedDoubleMessage() throws JMSException {
- Object expected;
- try {
- expected = Double.valueOf(null);
- }
- catch (Exception ex) {
- expected = ex;
- }
- try {
- Double actual = message.getDouble("foo");
- assertEquals(expected, actual);
- }
- catch (Exception ex) {
- Class<?> aClass = expected.getClass();
- assertTrue(aClass.isInstance(ex));
- }
- }
-
- public void testUnmappedFloatMessage() throws JMSException {
- Object expected;
- try {
- expected = Float.valueOf(null);
- }
- catch (Exception ex) {
- expected = ex;
- }
- try {
- Float actual = message.getFloat("foo");
- assertEquals(expected, actual);
- }
- catch (Exception ex) {
- Class<?> aClass = expected.getClass();
- assertTrue(aClass.isInstance(ex));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
deleted file mode 100644
index 78a6088..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-
-/**
- *
- *
- */
-public class AMQ1687Test extends EmbeddedBrokerTestSupport {
-
- private Connection connection;
-
- @Override
- protected ConnectionFactory createConnectionFactory() throws Exception {
- //prefetch change is not required, but test will not fail w/o it, only spew errors in the AMQ log.
- return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + "?jms.prefetchPolicy.all=5");
- }
-
- public void testVirtualTopicCreation() throws Exception {
- if (connection == null) {
- connection = createConnection();
- }
- connection.start();
-
- ConsumerBean messageList = new ConsumerBean();
- messageList.setVerbose(true);
-
- String queueAName = getVirtualTopicConsumerName();
- String queueBName = getVirtualTopicConsumerNameB();
-
- // create consumer 'cluster'
- ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
- ActiveMQQueue queue2 = new ActiveMQQueue(queueBName);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c1 = session.createConsumer(queue1);
- MessageConsumer c2 = session.createConsumer(queue2);
-
- c1.setMessageListener(messageList);
- c2.setMessageListener(messageList);
-
- // create topic producer
- ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName());
- MessageProducer producer = session.createProducer(topic);
- assertNotNull(producer);
-
- int total = 100;
- for (int i = 0; i < total; i++) {
- producer.send(session.createTextMessage("message: " + i));
- }
-
- messageList.assertMessagesArrived(total * 2);
- }
-
- protected String getVirtualTopicName() {
- return "VirtualTopic.TEST";
- }
-
- protected String getVirtualTopicConsumerName() {
- return "Consumer.A.VirtualTopic.TEST";
- }
-
- protected String getVirtualTopicConsumerNameB() {
- return "Consumer.B.VirtualTopic.TEST";
- }
-
- @Override
- protected void setUp() throws Exception {
- this.bindAddress = "tcp://localhost:0";
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- }
- super.tearDown();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java
deleted file mode 100644
index 2f7b8fe..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.*;
-
-import java.net.URI;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageProducer;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test validates that the AMQ consumer blocks on redelivery of a message,
- * through all redeliveries, until the message is either successfully consumed
- * or sent to the DLQ.
- */
-public class AMQ1853Test {
-
- private static BrokerService broker;
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ1853Test.class);
- static final String jmsConnectionURI = "failover:(vm://localhost)";
-
- // Virtual Topic that the test publishes 10 messages to
- private static final String queueFail = "Queue.BlockingConsumer.QueueFail";
-
- // Number of messages
-
- private final int producerMessages = 5;
- private final int totalNumberMessages = producerMessages * 2;
- private final int maxRedeliveries = 2;
- private final int redeliveryDelay = 1000;
-
- private Map<String, AtomicInteger> messageList = null;
-
- @Before
- public void setUp() throws Exception {
- broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false"));
- broker.setUseJmx(false);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- broker = null;
- }
- }
-
- @Test
- public void testConsumerMessagesAreNotOrdered() throws Exception {
-
- TestConsumer consumerAllFail = null;
- messageList = new Hashtable<>();
-
- try {
-
- // The first 2 consumers will rollback, ultimately causing messages to land on the DLQ
-
- TestProducer producerAllFail = new TestProducer(queueFail);
- thread(producerAllFail, false);
-
- consumerAllFail = new TestConsumer(queueFail, true);
- thread(consumerAllFail, false);
-
- // Give the consumers a second to start
- Thread.sleep(1000);
-
- thread(producerAllFail, false);
-
- // Give the consumers a second to start
- Thread.sleep(1000);
-
- producerAllFail.getLatch().await();
-
- LOG.info("producer successful, count = " + producerAllFail.getLatch().getCount());
- LOG.info("final message list size = " + messageList.size());
-
- assertTrue("message list size = " + messageList.size() + " exptected:" + totalNumberMessages, Wait.waitFor(new Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return totalNumberMessages == messageList.size();
- }
- }));
-
- consumerAllFail.getLatch().await();
-
- LOG.info("consumerAllFail successful, count = " + consumerAllFail.getLatch().getCount());
-
- Iterator<String> keys = messageList.keySet().iterator();
- for (AtomicInteger counter : messageList.values()) {
- String message = keys.next();
- LOG.info("final count for message " + message + " counter = " + counter.get());
- assertTrue("for message " + message + " counter = " + counter.get(), counter.get() == maxRedeliveries + 1);
- }
-
- assertFalse(consumerAllFail.messageReceiptIsOrdered());
- }
- finally {
- if (consumerAllFail != null) {
- consumerAllFail.setStop(true);
- }
- }
- }
-
- private static Thread thread(Runnable runnable, boolean daemon) {
- Thread brokerThread = new Thread(runnable);
- brokerThread.setDaemon(daemon);
- brokerThread.start();
- return brokerThread;
- }
-
- private class TestProducer implements Runnable {
-
- private CountDownLatch latch = null;
- private String destinationName = null;
-
- public TestProducer(String destinationName) {
- this.destinationName = destinationName;
- // We run the producer 2 times
- latch = new CountDownLatch(totalNumberMessages);
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- @Override
- public void run() {
-
- ActiveMQConnectionFactory connectionFactory = null;
- ActiveMQConnection connection = null;
- ActiveMQSession session = null;
- Destination destination = null;
-
- try {
- LOG.info("Started TestProducer for destination (" + destinationName + ")");
-
- connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
- connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.setCopyMessageOnSend(false);
- connection.start();
- session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- destination = session.createQueue(this.destinationName);
-
- // Create a MessageProducer from the Session to the Topic or Queue
- ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < (producerMessages); i++) {
- TextMessage message = session.createTextMessage();
- message.setLongProperty("TestTime", (System.currentTimeMillis()));
- try {
- producer.send(message);
- LOG.info("Producer (" + destinationName + ")\n" + message.getJMSMessageID() + " = sent messageId\n");
-
- latch.countDown();
- LOG.info(" Latch count " + latch.getCount());
- LOG.info("Producer message list size = " + messageList.keySet().size());
- messageList.put(message.getJMSMessageID(), new AtomicInteger(0));
- LOG.info("Producer message list size = " + messageList.keySet().size());
-
- }
- catch (Exception deeperException) {
- LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException);
- }
-
- Thread.sleep(1000);
- }
-
- LOG.info("Finished TestProducer for destination (" + destinationName + ")");
-
- }
- catch (Exception e) {
- LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e);
- }
- finally {
- try {
- if (session != null) {
- session.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- catch (Exception e) {
- LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
- }
- }
- }
- }
-
- private class TestConsumer implements Runnable, ExceptionListener, MessageListener {
-
- private CountDownLatch latch = null;
- private int receivedMessageCounter = 0;
- private boolean bFakeFail = false;
- String destinationName = null;
- boolean bMessageReceiptIsOrdered = true;
- boolean bStop = false;
- String previousMessageId = null;
-
- private ActiveMQConnectionFactory connectionFactory = null;
- private ActiveMQConnection connection = null;
- private Session session = null;
- private MessageConsumer consumer = null;
-
- public TestConsumer(String destinationName, boolean bFakeFail) {
- this.bFakeFail = bFakeFail;
- latch = new CountDownLatch(totalNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1));
- this.destinationName = destinationName;
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- public boolean messageReceiptIsOrdered() {
- return bMessageReceiptIsOrdered;
- }
-
- @Override
- public void run() {
-
- try {
- LOG.info("Started TestConsumer for destination (" + destinationName + ")");
-
- connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI);
- connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.setNonBlockingRedelivery(true);
- session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
- RedeliveryPolicy policy = connection.getRedeliveryPolicy();
- policy.setInitialRedeliveryDelay(redeliveryDelay);
- policy.setBackOffMultiplier(-1);
- policy.setRedeliveryDelay(redeliveryDelay);
- policy.setMaximumRedeliveryDelay(-1);
- policy.setUseExponentialBackOff(false);
- policy.setMaximumRedeliveries(maxRedeliveries);
-
- connection.setExceptionListener(this);
- Destination destination = session.createQueue(destinationName);
- consumer = session.createConsumer(destination);
- consumer.setMessageListener(this);
-
- connection.start();
-
- while (!bStop) {
- Thread.sleep(100);
- }
-
- LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount() + " messages " + this.toString());
-
- }
- catch (Exception e) {
- LOG.error("Consumer (" + destinationName + ") Caught: " + e);
- }
- finally {
- try {
- if (consumer != null) {
- consumer.close();
- }
- if (session != null) {
- session.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- catch (Exception e) {
- LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e);
- }
- }
- }
-
- @Override
- public synchronized void onException(JMSException ex) {
- LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occurred. Shutting down client.");
- }
-
- public synchronized void setStop(boolean bStop) {
- this.bStop = bStop;
- }
-
- @Override
- public synchronized void onMessage(Message message) {
- receivedMessageCounter++;
- latch.countDown();
-
- LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() +
- " :: Number messages received " + this.receivedMessageCounter);
-
- try {
-
- if (receivedMessageCounter % (maxRedeliveries + 1) == 1) {
- previousMessageId = message.getJMSMessageID();
- }
-
- if (bMessageReceiptIsOrdered) {
- bMessageReceiptIsOrdered = previousMessageId.trim().equals(message.getJMSMessageID());
- }
-
- final String jmsMessageId = message.getJMSMessageID();
- assertTrue("Did not find expected ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return messageList.containsKey(jmsMessageId);
- }
- }));
-
- AtomicInteger counter = messageList.get(jmsMessageId);
- counter.incrementAndGet();
-
- LOG.info("Consumer for destination (" + destinationName + ")\n" + message.getJMSMessageID() + " = currentMessageId\n" + previousMessageId + " = previousMessageId\n" + bMessageReceiptIsOrdered + "= bMessageReceiptIsOrdered\n" + ">>LATENCY " + (System.currentTimeMillis() - message.getLongProperty("TestTime")) + "\n" + "message counter = " + counter.get());
-
- if (!bFakeFail) {
- LOG.debug("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString());
- session.commit();
- }
- else {
- LOG.debug("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString());
- session.rollback(); // rolls back all the consumed messages on the session to
- }
-
- }
- catch (Exception ex) {
- ex.printStackTrace();
- LOG.error("Error reading JMS Message from destination " + destinationName + ".");
- }
- }
- }
-}