You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/12/08 16:18:11 UTC
[5/8] activemq-artemis git commit: Add tests for auto-created queue
default config
Add tests for auto-created queue default config
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/10fb7f6d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/10fb7f6d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/10fb7f6d
Branch: refs/heads/ARTEMIS-780
Commit: 10fb7f6dcc032ef5a9fc92feae825c8780257c73
Parents: 5b66382
Author: jbertram <jb...@apache.com>
Authored: Wed Dec 7 10:39:41 2016 -0600
Committer: jbertram <jb...@apache.com>
Committed: Thu Dec 8 08:43:37 2016 -0600
----------------------------------------------------------------------
.../tests/integration/jms/JmsProducerTest.java | 39 +
.../integration/jms/consumer/ConsumerTest.java | 742 ------------------
.../jms/consumer/JmsConsumerTest.java | 766 +++++++++++++++++++
3 files changed, 805 insertions(+), 742 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/10fb7f6d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java
index 245ac89..07ef73c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java
@@ -16,18 +16,21 @@
*/
package org.apache.activemq.artemis.tests.integration.jms;
+import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSProducer;
import javax.jms.MessageFormatRuntimeException;
import javax.jms.Queue;
+import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Random;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSContext;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
@@ -134,6 +137,42 @@ public class JmsProducerTest extends JMSTestBase {
}
@Test
+ public void defaultAutoCreatedQueueConfigTest() throws Exception {
+ final String queueName = "q1";
+
+ server.getAddressSettingsRepository().addMatch(queueName, new AddressSettings().setDefaultMaxConsumers(5).setDefaultDeleteOnNoConsumers(true));
+
+ Queue q1 = context.createQueue(queueName);
+
+ context.createProducer().setProperty("prop1", 1).setProperty("prop2", 2).send(q1, "Text1");
+
+ org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName));
+
+ assertEquals(5, queue.getMaxConsumers());
+ assertEquals(true, queue.isDeleteOnNoConsumers());
+ }
+
+ @Test
+ public void defaultAutoCreatedQueueConfigTest2() throws Exception {
+ final String queueName = "q1";
+
+ server.getAddressSettingsRepository().addMatch(queueName, new AddressSettings().setDefaultMaxConsumers(5).setDefaultDeleteOnNoConsumers(true));
+
+ Connection connection = cf.createConnection();
+
+ Session session = connection.createSession();
+
+ session.createProducer(session.createQueue(queueName));
+
+ org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName));
+
+ assertEquals(5, queue.getMaxConsumers());
+ assertEquals(true, queue.isDeleteOnNoConsumers());
+
+ connection.close();
+ }
+
+ @Test
public void testDeliveryMode() {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Assert.assertEquals(DeliveryMode.PERSISTENT, producer.getDeliveryMode());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/10fb7f6d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/ConsumerTest.java
deleted file mode 100644
index 80dac25..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/ConsumerTest.java
+++ /dev/null
@@ -1,742 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.jms.consumer;
-
-import javax.jms.Connection;
-import javax.jms.JMSConsumer;
-import javax.jms.JMSContext;
-import javax.jms.JMSException;
-import javax.jms.JMSProducer;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import java.util.Enumeration;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
-import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
-import org.apache.activemq.artemis.api.jms.JMSFactoryType;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
-import org.apache.activemq.artemis.tests.util.JMSTestBase;
-import org.apache.activemq.artemis.utils.ReusableLatch;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ConsumerTest extends JMSTestBase {
-
- private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
-
- private static final String Q_NAME = "ConsumerTestQueue";
-
- private static final String T_NAME = "ConsumerTestTopic";
-
- private static final String T2_NAME = "ConsumerTestTopic2";
-
- private javax.jms.Queue jBossQueue;
- private javax.jms.Topic topic;
- private javax.jms.Topic topic2;
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
-
- topic = ActiveMQJMSClient.createTopic(T_NAME);
- topic2 = ActiveMQJMSClient.createTopic(T2_NAME);
-
- jmsServer.createQueue(false, ConsumerTest.Q_NAME, null, true, ConsumerTest.Q_NAME);
- jmsServer.createTopic(true, T_NAME, "/topic/" + T_NAME);
- jmsServer.createTopic(true, T2_NAME, "/topic/" + T2_NAME);
- cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- }
-
- @Test
- public void testTransactionalSessionRollback() throws Exception {
- conn = cf.createConnection();
- Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
-
- MessageProducer prod = sess.createProducer(topic);
- MessageConsumer cons = sess.createConsumer(topic);
-
- conn.start();
-
- TextMessage msg1 = sess.createTextMessage("m1");
- TextMessage msg2 = sess.createTextMessage("m2");
- TextMessage msg3 = sess.createTextMessage("m3");
-
- prod.send(msg1);
- sess.commit();
-
- prod.send(msg2);
- sess.rollback();
-
- prod.send(msg3);
- sess.commit();
-
- TextMessage m1 = (TextMessage) cons.receive(2000);
- Assert.assertNotNull(m1);
- Assert.assertEquals("m1", m1.getText());
-
- TextMessage m2 = (TextMessage) cons.receive(2000);
- Assert.assertNotNull(m2);
- Assert.assertEquals("m3", m2.getText());
-
- TextMessage m3 = (TextMessage) cons.receive(2000);
- Assert.assertNull("m3 should be null", m3);
-
- System.out.println("received m1: " + m1.getText());
- System.out.println("received m2: " + m2.getText());
- System.out.println("received m3: " + m3);
- sess.commit();
- }
-
- @Test
- public void testPreCommitAcks() throws Exception {
- conn = cf.createConnection();
- Session session = conn.createSession(false, ActiveMQJMSConstants.PRE_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageProducer producer = session.createProducer(jBossQueue);
- MessageConsumer consumer = session.createConsumer(jBossQueue);
- int noOfMessages = 100;
- for (int i = 0; i < noOfMessages; i++) {
- producer.send(session.createTextMessage("m" + i));
- }
-
- conn.start();
- for (int i = 0; i < noOfMessages; i++) {
- Message m = consumer.receive(500);
- Assert.assertNotNull(m);
- }
-
- SimpleString queueName = new SimpleString(ConsumerTest.Q_NAME);
- Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
- Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
- }
-
- @Test
- public void testIndividualACK() throws Exception {
- Connection conn = cf.createConnection();
- Session session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageProducer producer = session.createProducer(jBossQueue);
- MessageConsumer consumer = session.createConsumer(jBossQueue);
- int noOfMessages = 100;
- for (int i = 0; i < noOfMessages; i++) {
- producer.send(session.createTextMessage("m" + i));
- }
-
- conn.start();
-
- // Consume even numbers first
- for (int i = 0; i < noOfMessages; i++) {
- Message m = consumer.receive(500);
- Assert.assertNotNull(m);
- if (i % 2 == 0) {
- m.acknowledge();
- }
- }
-
- session.close();
-
- session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
-
- consumer = session.createConsumer(jBossQueue);
-
- // Consume odd numbers first
- for (int i = 0; i < noOfMessages; i++) {
- if (i % 2 == 0) {
- continue;
- }
-
- TextMessage m = (TextMessage) consumer.receive(1000);
- Assert.assertNotNull(m);
- m.acknowledge();
- Assert.assertEquals("m" + i, m.getText());
- }
-
- SimpleString queueName = new SimpleString(ConsumerTest.Q_NAME);
- Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
- Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
- conn.close();
- }
-
- @Test
- public void testIndividualACKMessageConsumer() throws Exception {
- Connection conn = cf.createConnection();
- Session session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageProducer producer = session.createProducer(jBossQueue);
- MessageConsumer consumer = session.createConsumer(jBossQueue);
- int noOfMessages = 100;
- for (int i = 0; i < noOfMessages; i++) {
- producer.setPriority(2);
- producer.send(session.createTextMessage("m" + i));
- }
-
- conn.start();
-
- final AtomicInteger errors = new AtomicInteger(0);
- final ReusableLatch latch = new ReusableLatch();
- latch.setCount(noOfMessages);
-
- class MessageAckEven implements MessageListener {
-
- int count = 0;
-
- @Override
- public void onMessage(Message msg) {
- try {
- TextMessage txtmsg = (TextMessage) msg;
- if (!txtmsg.getText().equals("m" + count)) {
-
- errors.incrementAndGet();
- }
-
- if (count % 2 == 0) {
- msg.acknowledge();
- }
-
- count++;
- } catch (Exception e) {
- errors.incrementAndGet();
- } finally {
- latch.countDown();
- }
- }
-
- }
-
- consumer.setMessageListener(new MessageAckEven());
-
- Assert.assertTrue(latch.await(5000));
-
- session.close();
-
- session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
-
- consumer = session.createConsumer(jBossQueue);
-
- // Consume odd numbers first
- for (int i = 0; i < noOfMessages; i++) {
- if (i % 2 == 0) {
- continue;
- }
-
- TextMessage m = (TextMessage) consumer.receive(1000);
- Assert.assertNotNull(m);
- m.acknowledge();
- Assert.assertEquals("m" + i, m.getText());
- }
-
- SimpleString queueName = new SimpleString(ConsumerTest.Q_NAME);
- Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
- Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
- conn.close();
- }
-
- @Test
- public void testPreCommitAcksSetOnConnectionFactory() throws Exception {
- ((ActiveMQConnectionFactory) cf).setPreAcknowledge(true);
- conn = cf.createConnection();
-
- Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageProducer producer = session.createProducer(jBossQueue);
- MessageConsumer consumer = session.createConsumer(jBossQueue);
- int noOfMessages = 100;
- for (int i = 0; i < noOfMessages; i++) {
- producer.send(session.createTextMessage("m" + i));
- }
-
- conn.start();
- for (int i = 0; i < noOfMessages; i++) {
- Message m = consumer.receive(500);
- Assert.assertNotNull(m);
- }
-
- // Messages should all have been acked since we set pre ack on the cf
- SimpleString queueName = new SimpleString(ConsumerTest.Q_NAME);
- Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
- Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
- }
-
- @Test
- public void testPreCommitAcksWithMessageExpiry() throws Exception {
- ConsumerTest.log.info("starting test");
-
- conn = cf.createConnection();
- Session session = conn.createSession(false, ActiveMQJMSConstants.PRE_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageProducer producer = session.createProducer(jBossQueue);
- MessageConsumer consumer = session.createConsumer(jBossQueue);
- int noOfMessages = 1000;
- for (int i = 0; i < noOfMessages; i++) {
- TextMessage textMessage = session.createTextMessage("m" + i);
- producer.setTimeToLive(1);
- producer.send(textMessage);
- }
-
- Thread.sleep(2);
-
- conn.start();
-
- Message m = consumer.receiveNoWait();
- Assert.assertNull(m);
-
- // Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
- // point
- // which can cause delivering count to flip to 1
-
- }
-
- @Test
- public void testPreCommitAcksWithMessageExpirySetOnConnectionFactory() throws Exception {
- ((ActiveMQConnectionFactory) cf).setPreAcknowledge(true);
- conn = cf.createConnection();
- Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageProducer producer = session.createProducer(jBossQueue);
- MessageConsumer consumer = session.createConsumer(jBossQueue);
- int noOfMessages = 1000;
- for (int i = 0; i < noOfMessages; i++) {
- TextMessage textMessage = session.createTextMessage("m" + i);
- producer.setTimeToLive(1);
- producer.send(textMessage);
- }
-
- Thread.sleep(2);
-
- conn.start();
- Message m = consumer.receiveNoWait();
- Assert.assertNull(m);
-
- // Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
- // point
- // which can cause delivering count to flip to 1
- }
-
- @Test
- public void testBrowserAndConsumerSimultaneous() throws Exception {
- ((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
- conn = cf.createConnection();
-
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageProducer producer = session.createProducer(jBossQueue);
-
- QueueBrowser browser = session.createBrowser(jBossQueue);
- Enumeration enumMessages = browser.getEnumeration();
-
- MessageConsumer consumer = session.createConsumer(jBossQueue);
- int noOfMessages = 10;
- for (int i = 0; i < noOfMessages; i++) {
- TextMessage textMessage = session.createTextMessage("m" + i);
- textMessage.setIntProperty("i", i);
- producer.send(textMessage);
- }
-
- conn.start();
- for (int i = 0; i < noOfMessages; i++) {
- TextMessage msg = (TextMessage) enumMessages.nextElement();
- Assert.assertNotNull(msg);
- Assert.assertEquals(i, msg.getIntProperty("i"));
-
- conn.start();
- TextMessage recvMessage = (TextMessage) consumer.receiveNoWait();
- Assert.assertNotNull(recvMessage);
- conn.stop();
- Assert.assertEquals(i, msg.getIntProperty("i"));
- }
-
- Assert.assertNull(consumer.receiveNoWait());
- Assert.assertFalse(enumMessages.hasMoreElements());
-
- conn.close();
-
- // Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
- // point
- // which can cause delivering count to flip to 1
- }
-
- @Test
- public void testBrowserAndConsumerSimultaneousDifferentConnections() throws Exception {
- ((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
- conn = cf.createConnection();
-
- Connection connConsumer = cf.createConnection();
- Session sessionConsumer = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageProducer producer = session.createProducer(jBossQueue);
- MessageConsumer consumer = sessionConsumer.createConsumer(jBossQueue);
- int noOfMessages = 1000;
- for (int i = 0; i < noOfMessages; i++) {
- TextMessage textMessage = session.createTextMessage("m" + i);
- textMessage.setIntProperty("i", i);
- producer.send(textMessage);
- }
-
- connConsumer.start();
-
- QueueBrowser browser = session.createBrowser(jBossQueue);
- Enumeration enumMessages = browser.getEnumeration();
-
- for (int i = 0; i < noOfMessages; i++) {
- TextMessage msg = (TextMessage) enumMessages.nextElement();
- Assert.assertNotNull(msg);
- Assert.assertEquals(i, msg.getIntProperty("i"));
-
- TextMessage recvMessage = (TextMessage) consumer.receiveNoWait();
- Assert.assertNotNull(recvMessage);
- Assert.assertEquals(i, msg.getIntProperty("i"));
- }
-
- Message m = consumer.receiveNoWait();
- Assert.assertFalse(enumMessages.hasMoreElements());
- Assert.assertNull(m);
-
- conn.close();
- }
-
- @Test
- public void testBrowserOnly() throws Exception {
- ((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
- conn = cf.createConnection();
-
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageProducer producer = session.createProducer(jBossQueue);
- int noOfMessages = 10;
- for (int i = 0; i < noOfMessages; i++) {
- TextMessage textMessage = session.createTextMessage("m" + i);
- textMessage.setIntProperty("i", i);
- producer.send(textMessage);
- }
-
- QueueBrowser browser = session.createBrowser(jBossQueue);
- Enumeration enumMessages = browser.getEnumeration();
-
- for (int i = 0; i < noOfMessages; i++) {
- Assert.assertTrue(enumMessages.hasMoreElements());
- TextMessage msg = (TextMessage) enumMessages.nextElement();
- Assert.assertNotNull(msg);
- Assert.assertEquals(i, msg.getIntProperty("i"));
-
- }
-
- Assert.assertFalse(enumMessages.hasMoreElements());
-
- conn.close();
-
- // Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
- // point
- // which can cause delivering count to flip to 1
- }
-
- @Test
- public void testClearExceptionListener() throws Exception {
- conn = cf.createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageConsumer consumer = session.createConsumer(jBossQueue);
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(final Message msg) {
- }
- });
-
- consumer.setMessageListener(null);
- consumer.receiveNoWait();
- }
-
- @Test
- public void testCantReceiveWhenListenerIsSet() throws Exception {
- conn = cf.createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
- MessageConsumer consumer = session.createConsumer(jBossQueue);
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(final Message msg) {
- }
- });
-
- try {
- consumer.receiveNoWait();
- Assert.fail("Should throw exception");
- } catch (JMSException e) {
- // Ok
- }
- }
-
- @Test
- public void testSharedConsumer() throws Exception {
- conn = cf.createConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- topic = ActiveMQJMSClient.createTopic(T_NAME);
-
- MessageConsumer cons = session.createSharedConsumer(topic, "test1");
-
- MessageProducer producer = session.createProducer(topic);
-
- producer.send(session.createTextMessage("test"));
-
- TextMessage txt = (TextMessage) cons.receive(5000);
-
- Assert.assertNotNull(txt);
- }
-
- @Test
- public void testSharedDurableConsumer() throws Exception {
- conn = cf.createConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- topic = ActiveMQJMSClient.createTopic(T_NAME);
-
- MessageConsumer cons = session.createSharedDurableConsumer(topic, "test1");
-
- MessageProducer producer = session.createProducer(topic);
-
- producer.send(session.createTextMessage("test"));
-
- TextMessage txt = (TextMessage) cons.receive(5000);
-
- Assert.assertNotNull(txt);
- }
-
- @Test
- public void testSharedDurableConsumerWithClientID() throws Exception {
- conn = cf.createConnection();
- conn.setClientID("C1");
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Connection conn2 = cf.createConnection();
- conn2.setClientID("C2");
- Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- {
- Connection conn3 = cf.createConnection();
-
- boolean exception = false;
- try {
- conn3.setClientID("C2");
- } catch (Exception e) {
- exception = true;
- }
-
- Assert.assertTrue(exception);
- conn3.close();
- }
-
- topic = ActiveMQJMSClient.createTopic(T_NAME);
-
- MessageConsumer cons = session.createSharedDurableConsumer(topic, "test1");
-
- MessageProducer producer = session.createProducer(topic);
-
- producer.send(session.createTextMessage("test"));
-
- TextMessage txt = (TextMessage) cons.receive(5000);
-
- Assert.assertNotNull(txt);
- }
-
- @Test
- public void testValidateExceptionsThroughSharedConsumers() throws Exception {
- conn = cf.createConnection();
- conn.setClientID("C1");
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Connection conn2 = cf.createConnection();
- conn2.setClientID("C2");
-
- MessageConsumer cons = session.createSharedConsumer(topic, "cons1");
- boolean exceptionHappened = false;
- try {
- MessageConsumer cons2Error = session.createSharedConsumer(topic2, "cons1");
- } catch (JMSException e) {
- exceptionHappened = true;
- }
-
- Assert.assertTrue(exceptionHappened);
-
- MessageProducer producer = session.createProducer(topic2);
-
- // This is durable, different than the one on topic... So it should go through
- MessageConsumer cons2 = session.createSharedDurableConsumer(topic2, "cons1");
-
- conn.start();
-
- producer.send(session.createTextMessage("hello!"));
-
- TextMessage msg = (TextMessage) cons2.receive(5000);
- Assert.assertNotNull(msg);
-
- exceptionHappened = false;
- try {
- session.unsubscribe("cons1");
- } catch (JMSException e) {
- exceptionHappened = true;
- }
-
- Assert.assertTrue(exceptionHappened);
- cons2.close();
- conn.close();
- conn2.close();
-
- }
-
- @Test
- public void testUnsubscribeDurable() throws Exception {
- conn = cf.createConnection();
- conn.setClientID("C1");
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = session.createSharedDurableConsumer(topic, "c1");
-
- MessageProducer prod = session.createProducer(topic);
-
- for (int i = 0; i < 100; i++) {
- prod.send(session.createTextMessage("msg" + i));
- }
-
- Assert.assertNotNull(cons.receive(5000));
-
- cons.close();
-
- session.unsubscribe("c1");
-
- cons = session.createSharedDurableConsumer(topic, "c1");
-
- // it should be null since the queue was deleted through unsubscribe
- Assert.assertNull(cons.receiveNoWait());
- }
-
- @Test
- public void testShareDurable() throws Exception {
- ((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
- conn = cf.createConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = session.createSharedDurableConsumer(topic, "c1");
- MessageConsumer cons2 = session2.createSharedDurableConsumer(topic, "c1");
-
- MessageProducer prod = session.createProducer(topic);
-
- for (int i = 0; i < 100; i++) {
- prod.send(session.createTextMessage("msg" + i));
- }
-
- for (int i = 0; i < 50; i++) {
- Message msg = cons.receive(5000);
- Assert.assertNotNull(msg);
- msg = cons2.receive(5000);
- Assert.assertNotNull(msg);
- }
-
- Assert.assertNull(cons.receiveNoWait());
- Assert.assertNull(cons2.receiveNoWait());
-
- cons.close();
-
- boolean exceptionHappened = false;
-
- try {
- session.unsubscribe("c1");
- } catch (JMSException e) {
- exceptionHappened = true;
- }
-
- Assert.assertTrue(exceptionHappened);
-
- cons2.close();
-
- for (int i = 0; i < 100; i++) {
- prod.send(session.createTextMessage("msg" + i));
- }
-
- session.unsubscribe("c1");
-
- cons = session.createSharedDurableConsumer(topic, "c1");
-
- // it should be null since the queue was deleted through unsubscribe
- Assert.assertNull(cons.receiveNoWait());
- }
-
- @Test
- public void testShareDuraleWithJMSContext() throws Exception {
- ((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
- JMSContext conn = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE);
-
- JMSConsumer consumer = conn.createSharedDurableConsumer(topic, "c1");
-
- JMSProducer producer = conn.createProducer();
-
- for (int i = 0; i < 100; i++) {
- producer.setProperty("count", i).send(topic, "test" + i);
- }
-
- JMSContext conn2 = conn.createContext(JMSContext.AUTO_ACKNOWLEDGE);
- JMSConsumer consumer2 = conn2.createSharedDurableConsumer(topic, "c1");
-
- for (int i = 0; i < 50; i++) {
- String txt = consumer.receiveBody(String.class, 5000);
- System.out.println("TXT:" + txt);
- Assert.assertNotNull(txt);
-
- txt = consumer.receiveBody(String.class, 5000);
- System.out.println("TXT:" + txt);
- Assert.assertNotNull(txt);
- }
-
- Assert.assertNull(consumer.receiveNoWait());
- Assert.assertNull(consumer2.receiveNoWait());
-
- boolean exceptionHappened = false;
-
- try {
- conn.unsubscribe("c1");
- } catch (Exception e) {
- e.printStackTrace();
- exceptionHappened = true;
- }
-
- Assert.assertTrue(exceptionHappened);
-
- consumer.close();
- consumer2.close();
- conn2.close();
-
- conn.unsubscribe("c1");
-
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/10fb7f6d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
new file mode 100644
index 0000000..2e76255
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
@@ -0,0 +1,766 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.consumer;
+
+import javax.jms.Connection;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Enumeration;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JmsConsumerTest extends JMSTestBase {
+
+ private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
+ private static final String Q_NAME = "ConsumerTestQueue";
+
+ private static final String T_NAME = "ConsumerTestTopic";
+
+ private static final String T2_NAME = "ConsumerTestTopic2";
+
+ private javax.jms.Queue jBossQueue;
+ private javax.jms.Topic topic;
+ private javax.jms.Topic topic2;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ topic = ActiveMQJMSClient.createTopic(T_NAME);
+ topic2 = ActiveMQJMSClient.createTopic(T2_NAME);
+
+ jmsServer.createQueue(false, JmsConsumerTest.Q_NAME, null, true, JmsConsumerTest.Q_NAME);
+ jmsServer.createTopic(true, T_NAME, "/topic/" + T_NAME);
+ jmsServer.createTopic(true, T2_NAME, "/topic/" + T2_NAME);
+ cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ }
+
+ @Test
+ public void testTransactionalSessionRollback() throws Exception {
+ conn = cf.createConnection();
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = sess.createProducer(topic);
+ MessageConsumer cons = sess.createConsumer(topic);
+
+ conn.start();
+
+ TextMessage msg1 = sess.createTextMessage("m1");
+ TextMessage msg2 = sess.createTextMessage("m2");
+ TextMessage msg3 = sess.createTextMessage("m3");
+
+ prod.send(msg1);
+ sess.commit();
+
+ prod.send(msg2);
+ sess.rollback();
+
+ prod.send(msg3);
+ sess.commit();
+
+ TextMessage m1 = (TextMessage) cons.receive(2000);
+ Assert.assertNotNull(m1);
+ Assert.assertEquals("m1", m1.getText());
+
+ TextMessage m2 = (TextMessage) cons.receive(2000);
+ Assert.assertNotNull(m2);
+ Assert.assertEquals("m3", m2.getText());
+
+ TextMessage m3 = (TextMessage) cons.receive(2000);
+ Assert.assertNull("m3 should be null", m3);
+
+ System.out.println("received m1: " + m1.getText());
+ System.out.println("received m2: " + m2.getText());
+ System.out.println("received m3: " + m3);
+ sess.commit();
+ }
+
+ @Test
+ public void testPreCommitAcks() throws Exception {
+ conn = cf.createConnection();
+ Session session = conn.createSession(false, ActiveMQJMSConstants.PRE_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 100;
+ for (int i = 0; i < noOfMessages; i++) {
+ producer.send(session.createTextMessage("m" + i));
+ }
+
+ conn.start();
+ for (int i = 0; i < noOfMessages; i++) {
+ Message m = consumer.receive(500);
+ Assert.assertNotNull(m);
+ }
+
+ SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+ Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
+ Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
+ }
+
+ @Test
+ public void testIndividualACK() throws Exception {
+ Connection conn = cf.createConnection();
+ Session session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 100;
+ for (int i = 0; i < noOfMessages; i++) {
+ producer.send(session.createTextMessage("m" + i));
+ }
+
+ conn.start();
+
+ // Consume even numbers first
+ for (int i = 0; i < noOfMessages; i++) {
+ Message m = consumer.receive(500);
+ Assert.assertNotNull(m);
+ if (i % 2 == 0) {
+ m.acknowledge();
+ }
+ }
+
+ session.close();
+
+ session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
+
+ consumer = session.createConsumer(jBossQueue);
+
+ // Consume odd numbers first
+ for (int i = 0; i < noOfMessages; i++) {
+ if (i % 2 == 0) {
+ continue;
+ }
+
+ TextMessage m = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals("m" + i, m.getText());
+ }
+
+ SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+ Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+ Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
+ conn.close();
+ }
+
+ @Test
+ public void testIndividualACKMessageConsumer() throws Exception {
+ Connection conn = cf.createConnection();
+ Session session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 100;
+ for (int i = 0; i < noOfMessages; i++) {
+ producer.setPriority(2);
+ producer.send(session.createTextMessage("m" + i));
+ }
+
+ conn.start();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+ final ReusableLatch latch = new ReusableLatch();
+ latch.setCount(noOfMessages);
+
+ class MessageAckEven implements MessageListener {
+
+ int count = 0;
+
+ @Override
+ public void onMessage(Message msg) {
+ try {
+ TextMessage txtmsg = (TextMessage) msg;
+ if (!txtmsg.getText().equals("m" + count)) {
+
+ errors.incrementAndGet();
+ }
+
+ if (count % 2 == 0) {
+ msg.acknowledge();
+ }
+
+ count++;
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ }
+
+ consumer.setMessageListener(new MessageAckEven());
+
+ Assert.assertTrue(latch.await(5000));
+
+ session.close();
+
+ session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
+
+ consumer = session.createConsumer(jBossQueue);
+
+ // Consume odd numbers first
+ for (int i = 0; i < noOfMessages; i++) {
+ if (i % 2 == 0) {
+ continue;
+ }
+
+ TextMessage m = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals("m" + i, m.getText());
+ }
+
+ SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+ Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+ Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
+ conn.close();
+ }
+
+ @Test
+ public void testPreCommitAcksSetOnConnectionFactory() throws Exception {
+ ((ActiveMQConnectionFactory) cf).setPreAcknowledge(true);
+ conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 100;
+ for (int i = 0; i < noOfMessages; i++) {
+ producer.send(session.createTextMessage("m" + i));
+ }
+
+ conn.start();
+ for (int i = 0; i < noOfMessages; i++) {
+ Message m = consumer.receive(500);
+ Assert.assertNotNull(m);
+ }
+
+ // Messages should all have been acked since we set pre ack on the cf
+ SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+ Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+ Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
+ }
+
+ @Test
+ public void testPreCommitAcksWithMessageExpiry() throws Exception {
+ JmsConsumerTest.log.info("starting test");
+
+ conn = cf.createConnection();
+ Session session = conn.createSession(false, ActiveMQJMSConstants.PRE_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 1000;
+ for (int i = 0; i < noOfMessages; i++) {
+ TextMessage textMessage = session.createTextMessage("m" + i);
+ producer.setTimeToLive(1);
+ producer.send(textMessage);
+ }
+
+ Thread.sleep(2);
+
+ conn.start();
+
+ Message m = consumer.receiveNoWait();
+ Assert.assertNull(m);
+
+ // Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
+ // point
+ // which can cause delivering count to flip to 1
+
+ }
+
+ @Test
+ public void testPreCommitAcksWithMessageExpirySetOnConnectionFactory() throws Exception {
+ ((ActiveMQConnectionFactory) cf).setPreAcknowledge(true);
+ conn = cf.createConnection();
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 1000;
+ for (int i = 0; i < noOfMessages; i++) {
+ TextMessage textMessage = session.createTextMessage("m" + i);
+ producer.setTimeToLive(1);
+ producer.send(textMessage);
+ }
+
+ Thread.sleep(2);
+
+ conn.start();
+ Message m = consumer.receiveNoWait();
+ Assert.assertNull(m);
+
+ // Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
+ // point
+ // which can cause delivering count to flip to 1
+ }
+
+ @Test
+ public void testBrowserAndConsumerSimultaneous() throws Exception {
+ ((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
+ conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+
+ QueueBrowser browser = session.createBrowser(jBossQueue);
+ Enumeration enumMessages = browser.getEnumeration();
+
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 10;
+ for (int i = 0; i < noOfMessages; i++) {
+ TextMessage textMessage = session.createTextMessage("m" + i);
+ textMessage.setIntProperty("i", i);
+ producer.send(textMessage);
+ }
+
+ conn.start();
+ for (int i = 0; i < noOfMessages; i++) {
+ TextMessage msg = (TextMessage) enumMessages.nextElement();
+ Assert.assertNotNull(msg);
+ Assert.assertEquals(i, msg.getIntProperty("i"));
+
+ conn.start();
+ TextMessage recvMessage = (TextMessage) consumer.receiveNoWait();
+ Assert.assertNotNull(recvMessage);
+ conn.stop();
+ Assert.assertEquals(i, msg.getIntProperty("i"));
+ }
+
+ Assert.assertNull(consumer.receiveNoWait());
+ Assert.assertFalse(enumMessages.hasMoreElements());
+
+ conn.close();
+
+ // Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
+ // point
+ // which can cause delivering count to flip to 1
+ }
+
+ @Test
+ public void testBrowserAndConsumerSimultaneousDifferentConnections() throws Exception {
+ ((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
+ conn = cf.createConnection();
+
+ Connection connConsumer = cf.createConnection();
+ Session sessionConsumer = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = sessionConsumer.createConsumer(jBossQueue);
+ int noOfMessages = 1000;
+ for (int i = 0; i < noOfMessages; i++) {
+ TextMessage textMessage = session.createTextMessage("m" + i);
+ textMessage.setIntProperty("i", i);
+ producer.send(textMessage);
+ }
+
+ connConsumer.start();
+
+ QueueBrowser browser = session.createBrowser(jBossQueue);
+ Enumeration enumMessages = browser.getEnumeration();
+
+ for (int i = 0; i < noOfMessages; i++) {
+ TextMessage msg = (TextMessage) enumMessages.nextElement();
+ Assert.assertNotNull(msg);
+ Assert.assertEquals(i, msg.getIntProperty("i"));
+
+ TextMessage recvMessage = (TextMessage) consumer.receiveNoWait();
+ Assert.assertNotNull(recvMessage);
+ Assert.assertEquals(i, msg.getIntProperty("i"));
+ }
+
+ Message m = consumer.receiveNoWait();
+ Assert.assertFalse(enumMessages.hasMoreElements());
+ Assert.assertNull(m);
+
+ conn.close();
+ }
+
+ @Test
+ public void testBrowserOnly() throws Exception {
+ ((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
+ conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ int noOfMessages = 10;
+ for (int i = 0; i < noOfMessages; i++) {
+ TextMessage textMessage = session.createTextMessage("m" + i);
+ textMessage.setIntProperty("i", i);
+ producer.send(textMessage);
+ }
+
+ QueueBrowser browser = session.createBrowser(jBossQueue);
+ Enumeration enumMessages = browser.getEnumeration();
+
+ for (int i = 0; i < noOfMessages; i++) {
+ Assert.assertTrue(enumMessages.hasMoreElements());
+ TextMessage msg = (TextMessage) enumMessages.nextElement();
+ Assert.assertNotNull(msg);
+ Assert.assertEquals(i, msg.getIntProperty("i"));
+
+ }
+
+ Assert.assertFalse(enumMessages.hasMoreElements());
+
+ conn.close();
+
+ // Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
+ // point
+ // which can cause delivering count to flip to 1
+ }
+
+ @Test
+ public void testClearExceptionListener() throws Exception {
+ conn = cf.createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(final Message msg) {
+ }
+ });
+
+ consumer.setMessageListener(null);
+ consumer.receiveNoWait();
+ }
+
+ @Test
+ public void testCantReceiveWhenListenerIsSet() throws Exception {
+ conn = cf.createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(final Message msg) {
+ }
+ });
+
+ try {
+ consumer.receiveNoWait();
+ Assert.fail("Should throw exception");
+ } catch (JMSException e) {
+ // Ok
+ }
+ }
+
+ @Test
+ public void testSharedConsumer() throws Exception {
+ conn = cf.createConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ topic = ActiveMQJMSClient.createTopic(T_NAME);
+
+ MessageConsumer cons = session.createSharedConsumer(topic, "test1");
+
+ MessageProducer producer = session.createProducer(topic);
+
+ producer.send(session.createTextMessage("test"));
+
+ TextMessage txt = (TextMessage) cons.receive(5000);
+
+ Assert.assertNotNull(txt);
+ }
+
+ @Test
+ public void testSharedDurableConsumer() throws Exception {
+ conn = cf.createConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ topic = ActiveMQJMSClient.createTopic(T_NAME);
+
+ MessageConsumer cons = session.createSharedDurableConsumer(topic, "test1");
+
+ MessageProducer producer = session.createProducer(topic);
+
+ producer.send(session.createTextMessage("test"));
+
+ TextMessage txt = (TextMessage) cons.receive(5000);
+
+ Assert.assertNotNull(txt);
+ }
+
+ @Test
+ public void testSharedDurableConsumerWithClientID() throws Exception {
+ conn = cf.createConnection();
+ conn.setClientID("C1");
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Connection conn2 = cf.createConnection();
+ conn2.setClientID("C2");
+ Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ {
+ Connection conn3 = cf.createConnection();
+
+ boolean exception = false;
+ try {
+ conn3.setClientID("C2");
+ } catch (Exception e) {
+ exception = true;
+ }
+
+ Assert.assertTrue(exception);
+ conn3.close();
+ }
+
+ topic = ActiveMQJMSClient.createTopic(T_NAME);
+
+ MessageConsumer cons = session.createSharedDurableConsumer(topic, "test1");
+
+ MessageProducer producer = session.createProducer(topic);
+
+ producer.send(session.createTextMessage("test"));
+
+ TextMessage txt = (TextMessage) cons.receive(5000);
+
+ Assert.assertNotNull(txt);
+ }
+
+ @Test
+ public void testValidateExceptionsThroughSharedConsumers() throws Exception {
+ conn = cf.createConnection();
+ conn.setClientID("C1");
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Connection conn2 = cf.createConnection();
+ conn2.setClientID("C2");
+
+ MessageConsumer cons = session.createSharedConsumer(topic, "cons1");
+ boolean exceptionHappened = false;
+ try {
+ MessageConsumer cons2Error = session.createSharedConsumer(topic2, "cons1");
+ } catch (JMSException e) {
+ exceptionHappened = true;
+ }
+
+ Assert.assertTrue(exceptionHappened);
+
+ MessageProducer producer = session.createProducer(topic2);
+
+ // This is durable, different than the one on topic... So it should go through
+ MessageConsumer cons2 = session.createSharedDurableConsumer(topic2, "cons1");
+
+ conn.start();
+
+ producer.send(session.createTextMessage("hello!"));
+
+ TextMessage msg = (TextMessage) cons2.receive(5000);
+ Assert.assertNotNull(msg);
+
+ exceptionHappened = false;
+ try {
+ session.unsubscribe("cons1");
+ } catch (JMSException e) {
+ exceptionHappened = true;
+ }
+
+ Assert.assertTrue(exceptionHappened);
+ cons2.close();
+ conn.close();
+ conn2.close();
+
+ }
+
+ @Test
+ public void testUnsubscribeDurable() throws Exception {
+ conn = cf.createConnection();
+ conn.setClientID("C1");
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = session.createSharedDurableConsumer(topic, "c1");
+
+ MessageProducer prod = session.createProducer(topic);
+
+ for (int i = 0; i < 100; i++) {
+ prod.send(session.createTextMessage("msg" + i));
+ }
+
+ Assert.assertNotNull(cons.receive(5000));
+
+ cons.close();
+
+ session.unsubscribe("c1");
+
+ cons = session.createSharedDurableConsumer(topic, "c1");
+
+ // it should be null since the queue was deleted through unsubscribe
+ Assert.assertNull(cons.receiveNoWait());
+ }
+
+ @Test
+ public void testShareDurable() throws Exception {
+ ((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
+ conn = cf.createConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = session.createSharedDurableConsumer(topic, "c1");
+ MessageConsumer cons2 = session2.createSharedDurableConsumer(topic, "c1");
+
+ MessageProducer prod = session.createProducer(topic);
+
+ for (int i = 0; i < 100; i++) {
+ prod.send(session.createTextMessage("msg" + i));
+ }
+
+ for (int i = 0; i < 50; i++) {
+ Message msg = cons.receive(5000);
+ Assert.assertNotNull(msg);
+ msg = cons2.receive(5000);
+ Assert.assertNotNull(msg);
+ }
+
+ Assert.assertNull(cons.receiveNoWait());
+ Assert.assertNull(cons2.receiveNoWait());
+
+ cons.close();
+
+ boolean exceptionHappened = false;
+
+ try {
+ session.unsubscribe("c1");
+ } catch (JMSException e) {
+ exceptionHappened = true;
+ }
+
+ Assert.assertTrue(exceptionHappened);
+
+ cons2.close();
+
+ for (int i = 0; i < 100; i++) {
+ prod.send(session.createTextMessage("msg" + i));
+ }
+
+ session.unsubscribe("c1");
+
+ cons = session.createSharedDurableConsumer(topic, "c1");
+
+ // it should be null since the queue was deleted through unsubscribe
+ Assert.assertNull(cons.receiveNoWait());
+ }
+
+ @Test
+ public void testShareDuraleWithJMSContext() throws Exception {
+ ((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
+ JMSContext conn = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE);
+
+ JMSConsumer consumer = conn.createSharedDurableConsumer(topic, "c1");
+
+ JMSProducer producer = conn.createProducer();
+
+ for (int i = 0; i < 100; i++) {
+ producer.setProperty("count", i).send(topic, "test" + i);
+ }
+
+ JMSContext conn2 = conn.createContext(JMSContext.AUTO_ACKNOWLEDGE);
+ JMSConsumer consumer2 = conn2.createSharedDurableConsumer(topic, "c1");
+
+ for (int i = 0; i < 50; i++) {
+ String txt = consumer.receiveBody(String.class, 5000);
+ System.out.println("TXT:" + txt);
+ Assert.assertNotNull(txt);
+
+ txt = consumer.receiveBody(String.class, 5000);
+ System.out.println("TXT:" + txt);
+ Assert.assertNotNull(txt);
+ }
+
+ Assert.assertNull(consumer.receiveNoWait());
+ Assert.assertNull(consumer2.receiveNoWait());
+
+ boolean exceptionHappened = false;
+
+ try {
+ conn.unsubscribe("c1");
+ } catch (Exception e) {
+ e.printStackTrace();
+ exceptionHappened = true;
+ }
+
+ Assert.assertTrue(exceptionHappened);
+
+ consumer.close();
+ consumer2.close();
+ conn2.close();
+
+ conn.unsubscribe("c1");
+
+ }
+
+ @Test
+ public void defaultAutoCreatedQueueConfigTest() throws Exception {
+ final String queueName = "q1";
+
+ server.getAddressSettingsRepository()
+ .addMatch(queueName, new AddressSettings()
+ .setDefaultMaxConsumers(5)
+ .setDefaultDeleteOnNoConsumers(true));
+
+ Connection connection = cf.createConnection();
+
+ Session session = connection.createSession();
+
+ session.createConsumer(session.createQueue(queueName));
+
+ org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName));
+
+ assertEquals(5, queue.getMaxConsumers());
+ assertEquals(true, queue.isDeleteOnNoConsumers());
+
+ connection.close();
+ }
+}