You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:36 UTC
[27/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java
deleted file mode 100644
index ea794ff..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import javax.jms.*;
-import javax.jms.Queue;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class AMQ2171Test implements Thread.UncaughtExceptionHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2171Test.class);
- private static final String BROKER_URL = "tcp://localhost:0";
- private static final int QUEUE_SIZE = 100;
-
- private static BrokerService brokerService;
- private static Queue destination;
-
- private String brokerUri;
- private String brokerUriNoPrefetch;
- private Collection<Throwable> exceptions = new CopyOnWriteArrayList<>();
-
- @Before
- public void setUp() throws Exception {
- // Start an embedded broker up.
- brokerService = new BrokerService();
- brokerService.setDeleteAllMessagesOnStartup(true);
- brokerService.addConnector(BROKER_URL);
- brokerService.start();
-
- brokerUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString().toString();
- brokerUriNoPrefetch = brokerUri + "?jms.prefetchPolicy.all=0";
-
- destination = new ActiveMQQueue("Test");
- produce(brokerUri, QUEUE_SIZE);
- }
-
- @Before
- public void addHandler() {
- Thread.setDefaultUncaughtExceptionHandler(this);
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- }
-
- @Test(timeout = 10000)
- public void testBrowsePrefetch() throws Exception {
- runTest(brokerUri);
- }
-
- @Test(timeout = 10000)
- public void testBrowseNoPrefetch() throws Exception {
- runTest(brokerUriNoPrefetch);
- }
-
- private void runTest(String brokerURL) throws Exception {
-
- Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
-
- try {
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Enumeration<Message> unread = session.createBrowser(destination).getEnumeration();
-
- int count = 0;
- while (unread.hasMoreElements()) {
- unread.nextElement();
- count++;
- }
-
- assertEquals(QUEUE_SIZE, count);
- assertTrue(exceptions.isEmpty());
- }
- finally {
- try {
- connection.close();
- }
- catch (JMSException e) {
- exceptions.add(e);
- }
- }
- }
-
- private static void produce(String brokerURL, int count) throws Exception {
- Connection connection = null;
-
- try {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
- connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setTimeToLive(0);
- connection.start();
-
- for (int i = 0; i < count; i++) {
- int id = i + 1;
- TextMessage message = session.createTextMessage("Message " + id);
- message.setIntProperty("MsgNumber", id);
- producer.send(message);
-
- if (id % 500 == 0) {
- LOG.info("sent " + id + ", ith " + message);
- }
- }
- }
- finally {
- try {
- if (connection != null) {
- connection.close();
- }
- }
- catch (Throwable e) {
- }
- }
- }
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- exceptions.add(e);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java
deleted file mode 100644
index d6b4aaa..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.TopicSubscriptionViewMBean;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ2200Test {
-
- private static final String bindAddress = "tcp://0.0.0.0:0";
- private BrokerService broker;
- private ActiveMQConnectionFactory cf;
-
- @Before
- public void setUp() throws Exception {
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "activemq-data");
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.setAdvisorySupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.addConnector(bindAddress);
- String address = broker.getTransportConnectors().get(0).getPublishableConnectString();
- broker.start();
- broker.waitUntilStarted();
-
- cf = new ActiveMQConnectionFactory(address);
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- @Test
- public void testTopicSubscriptionView() throws Exception {
- TopicConnection connection = cf.createTopicConnection();
- TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Topic destination = session.createTopic("TopicViewTestTopic");
- MessageConsumer consumer = session.createConsumer(destination);
- assertNotNull(consumer);
- TimeUnit.SECONDS.sleep(1);
-
- ObjectName subscriptionNames[] = broker.getAdminView().getTopicSubscribers();
- assertTrue(subscriptionNames.length > 0);
-
- boolean fail = true;
- for (ObjectName name : subscriptionNames) {
- if (name.toString().contains("TopicViewTestTopic")) {
- TopicSubscriptionViewMBean sub = (TopicSubscriptionViewMBean) broker.getManagementContext().newProxyInstance(name, TopicSubscriptionViewMBean.class, true);
- assertNotNull(sub);
- assertTrue(sub.getSessionId() != -1);
- // Check that its the default value then configure something new.
- assertTrue(sub.getMaximumPendingQueueSize() == -1);
- sub.setMaximumPendingQueueSize(1000);
- assertTrue(sub.getMaximumPendingQueueSize() != -1);
- fail = false;
- }
- }
-
- if (fail) {
- fail("Didn't find the TopicSubscriptionView");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java
deleted file mode 100644
index 2152e12..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ2213Test {
-
- BrokerService broker;
- ConnectionFactory factory;
- Connection connection;
- Session session;
- Queue queue;
- MessageConsumer consumer;
-
- public void createBroker(boolean deleteAll) throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(deleteAll);
- broker.setDataDirectory("target/AMQ3145Test");
- broker.setUseJmx(true);
- broker.getManagementContext().setCreateConnector(false);
- broker.addConnector("tcp://localhost:0");
- broker.start();
- broker.waitUntilStarted();
- factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- }
-
- @Before
- public void createBroker() throws Exception {
- createBroker(true);
- }
-
- @After
- public void tearDown() throws Exception {
- if (consumer != null) {
- consumer.close();
- }
- session.close();
- connection.stop();
- connection.close();
- broker.stop();
- }
-
- @Test
- public void testEqualsGenericSession() throws JMSException {
- assertNotNull(this.connection);
- Session sess = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- assertTrue(sess.equals(sess));
- }
-
- @Test
- public void testEqualsTopicSession() throws JMSException {
- assertNotNull(this.connection);
- assertTrue(this.connection instanceof TopicConnection);
- TopicSession sess = ((TopicConnection) this.connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- assertTrue(sess.equals(sess));
- }
-
- @Test
- public void testEqualsQueueSession() throws JMSException {
- assertNotNull(this.connection);
- assertTrue(this.connection instanceof QueueConnection);
- QueueSession sess = ((QueueConnection) this.connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- assertTrue(sess.equals(sess));
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
deleted file mode 100644
index fde821f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.CombinationTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ2314Test extends CombinationTestSupport {
-
- public boolean consumeAll = false;
- public int deliveryMode = DeliveryMode.NON_PERSISTENT;
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2314Test.class);
- private static final int MESSAGES_COUNT = 30000;
- private static byte[] buf = new byte[1024];
- private BrokerService broker;
- private String connectionUri;
-
- private static final long messageReceiveTimeout = 500L;
-
- Destination destination = new ActiveMQTopic("FooTwo");
-
- public void testRemoveSlowSubscriberWhacksTempStore() throws Exception {
- runProducerWithHungConsumer();
- }
-
- public void testMemoryUsageReleasedOnAllConsumed() throws Exception {
- consumeAll = true;
- runProducerWithHungConsumer();
- // do it again to ensure memory limits are decreased
- runProducerWithHungConsumer();
- }
-
- public void runProducerWithHungConsumer() throws Exception {
-
- final CountDownLatch consumerContinue = new CountDownLatch(1);
- final CountDownLatch consumerReady = new CountDownLatch(1);
-
- final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage();
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- factory.setAlwaysSyncSend(true);
-
- // ensure messages are spooled to disk for this consumer
- ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
- prefetch.setTopicPrefetch(500);
- factory.setPrefetchPolicy(prefetch);
- final Connection connection = factory.createConnection();
- connection.start();
-
- Thread producingThread = new Thread("Producing thread") {
- @Override
- public void run() {
- try {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
- for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
- Message message = session.createTextMessage(new String(buf) + idx);
- producer.send(message);
- }
- producer.close();
- session.close();
- }
- catch (Throwable ex) {
- ex.printStackTrace();
- }
- }
- };
-
- Thread consumingThread = new Thread("Consuming thread") {
- @Override
- public void run() {
- try {
- int count = 0;
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(destination);
-
- while (consumer.receive(messageReceiveTimeout) == null) {
- consumerReady.countDown();
- }
- count++;
- LOG.info("Received one... waiting");
- consumerContinue.await();
- if (consumeAll) {
- LOG.info("Consuming the rest of the messages...");
- while (consumer.receive(messageReceiveTimeout) != null) {
- count++;
- }
- }
- LOG.info("consumer session closing: consumed count: " + count);
- session.close();
- }
- catch (Throwable ex) {
- ex.printStackTrace();
- }
- }
- };
- consumingThread.start();
- consumerReady.await();
-
- producingThread.start();
- producingThread.join();
-
- final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage();
- LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription);
- assertTrue("some temp store has been used", tempUsageBySubscription != origTempUsage);
- consumerContinue.countDown();
- consumingThread.join();
- connection.close();
-
- LOG.info("Subscription Usage: " + tempUsageBySubscription + ", endUsage: " + broker.getSystemUsage().getTempUsage().getUsage());
-
- assertTrue("temp usage decreased with removed sub", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker.getSystemUsage().getTempUsage().getUsage() < tempUsageBySubscription;
- }
- }));
- }
-
- @Override
- public void setUp() throws Exception {
- super.setAutoFail(true);
- super.setUp();
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "activemq-data");
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.setAdvisorySupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.getSystemUsage().getMemoryUsage().setLimit(1024L * 1024 * 64);
-
- broker.addConnector("tcp://localhost:0").setName("Default");
- broker.start();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- @Override
- public void tearDown() throws Exception {
- broker.stop();
- }
-
- public static Test suite() {
- return suite(AMQ2314Test.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
deleted file mode 100644
index 2f9bb84..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-
-/*
- AMQ2356Test
- We have an environment where we have a very large number of destinations.
- In an effort to reduce the number of threads I have set the options
- -Dorg.apache.activemq.UseDedicatedTaskRunner=false
-
- and
-
- <policyEntry queue=">" optimizedDispatch="true"/>
-
- Unfortunately this very quickly leads to deadlocked queues.
-
- My environment is:
-
- ActiveMQ 5.2 Ubunty Jaunty kernel 2.6.28-14-generic #47-Ubuntu SMP (although only a single core on my system)
- TCP transportConnector
-
- To reproduce the bug (which I can do 100% of the time) I connect 5 consumers (AUTO_ACK) to 5 different queues.
- Then I start 5 producers and pair them up with a consumer on a queue, and they start sending PERSISTENT messages.
- I've set the producer to send 100 messages and disconnect, and the consumer to receive 100 messages and disconnect.
- The first pair usually gets through their 100 messages and disconnect, at which point all the other pairs have
- deadlocked at less than 30 messages each.
- */
-public class AMQ2356Test extends TestCase {
-
- protected static final int MESSAGE_COUNT = 1000;
- protected static final int NUMBER_OF_PAIRS = 10;
- protected BrokerService broker;
- protected String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
- protected int destinationCount;
-
- public void testScenario() throws Exception {
- for (int i = 0; i < NUMBER_OF_PAIRS; i++) {
- ActiveMQQueue queue = new ActiveMQQueue(getClass().getName() + ":" + i);
- ProducerConsumerPair cp = new ProducerConsumerPair();
- cp.start(this.brokerURL, queue, MESSAGE_COUNT);
- cp.testRun();
- cp.stop();
- }
- }
-
- protected Destination getDestination(Session session) throws JMSException {
- String destinationName = getClass().getName() + "." + destinationCount++;
- return session.createQueue(destinationName);
- }
-
- @Override
- protected void setUp() throws Exception {
- if (broker == null) {
- broker = createBroker();
- }
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- if (broker != null) {
- broker.stop();
- }
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- configureBroker(answer);
- answer.start();
- return answer;
- }
-
- protected void configureBroker(BrokerService answer) throws Exception {
- File dataFileDir = new File("target/test-amq-data/bugs/AMQ2356/kahadb");
- KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(dataFileDir);
- answer.setUseJmx(false);
- // Setup a destination policy where it takes only 1 message at a time.
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry policy = new PolicyEntry();
- policy.setOptimizedDispatch(true);
- policyMap.setDefaultEntry(policy);
- answer.setDestinationPolicy(policyMap);
-
- answer.setAdvisorySupport(false);
- answer.setEnableStatistics(false);
- answer.setDeleteAllMessagesOnStartup(true);
- answer.addConnector(brokerURL);
-
- }
-
- static class ProducerConsumerPair {
-
- private Destination destination;
- private MessageProducer producer;
- private MessageConsumer consumer;
- private Connection producerConnection;
- private Connection consumerConnection;
- private int numberOfMessages;
-
- ProducerConsumerPair() {
-
- }
-
- void start(String brokerURL, final Destination dest, int msgNum) throws Exception {
- this.destination = dest;
- this.numberOfMessages = msgNum;
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL);
- this.producerConnection = cf.createConnection();
- this.producerConnection.start();
- this.consumerConnection = cf.createConnection();
- this.consumerConnection.start();
- this.producer = createProducer(this.producerConnection);
- this.consumer = createConsumer(this.consumerConnection);
- }
-
- void testRun() throws Exception {
-
- Session s = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- for (int i = 0; i < this.numberOfMessages; i++) {
- BytesMessage msg = s.createBytesMessage();
- msg.writeBytes(new byte[1024]);
- this.producer.send(msg);
- }
- int received = 0;
- for (int i = 0; i < this.numberOfMessages; i++) {
- Message msg = this.consumer.receive();
- assertNotNull(msg);
- received++;
- }
- assertEquals("Messages received on " + this.destination, this.numberOfMessages, received);
-
- }
-
- void stop() throws Exception {
- if (this.producerConnection != null) {
- this.producerConnection.close();
- }
- if (this.consumerConnection != null) {
- this.consumerConnection.close();
- }
- }
-
- private MessageProducer createProducer(Connection connection) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer result = session.createProducer(this.destination);
- return result;
- }
-
- private MessageConsumer createConsumer(Connection connection) throws Exception {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer result = session.createConsumer(this.destination);
- return result;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java
deleted file mode 100644
index 5f79b6c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-//package org.apache.activemq.transport.failover;
-
-import static org.junit.Assert.assertEquals;
-
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.state.ConnectionState;
-import org.apache.activemq.state.ConnectionStateTracker;
-import org.apache.activemq.state.TransactionState;
-import org.apache.activemq.transport.MutexTransport;
-import org.apache.activemq.transport.ResponseCorrelator;
-import org.apache.activemq.transport.failover.FailoverTransport;
-import org.junit.Test;
-
-public class AMQ2364Test {
-
- @SuppressWarnings("unchecked")
- @Test
- public void testRollbackLeak() throws Exception {
-
- int messageCount = 1000;
- URI failoverUri = new URI("failover:(vm://localhost)?jms.redeliveryPolicy.maximumRedeliveries=0");
-
- Destination dest = new ActiveMQQueue("Failover.Leak");
-
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri);
- ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
- connection.start();
- final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
- MessageProducer producer = session.createProducer(dest);
-
- for (int i = 0; i < messageCount; ++i)
- producer.send(session.createTextMessage("Test message #" + i));
- producer.close();
- session.commit();
-
- MessageConsumer consumer = session.createConsumer(dest);
-
- final CountDownLatch latch = new CountDownLatch(messageCount);
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message msg) {
- try {
- session.rollback();
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- finally {
- latch.countDown();
- }
- }
- });
-
- latch.await();
- consumer.close();
- session.close();
-
- ResponseCorrelator respCorr = (ResponseCorrelator) connection.getTransport();
- MutexTransport mutexTrans = (MutexTransport) respCorr.getNext();
- FailoverTransport failoverTrans = (FailoverTransport) mutexTrans.getNext();
- Field stateTrackerField = FailoverTransport.class.getDeclaredField("stateTracker");
- stateTrackerField.setAccessible(true);
- ConnectionStateTracker stateTracker = (ConnectionStateTracker) stateTrackerField.get(failoverTrans);
- Field statesField = ConnectionStateTracker.class.getDeclaredField("connectionStates");
- statesField.setAccessible(true);
- ConcurrentHashMap<ConnectionId, ConnectionState> states = (ConcurrentHashMap<ConnectionId, ConnectionState>) statesField.get(stateTracker);
-
- ConnectionState state = states.get(connection.getConnectionInfo().getConnectionId());
-
- Collection<TransactionState> transactionStates = state.getTransactionStates();
-
- connection.stop();
- connection.close();
-
- assertEquals("Transaction states not cleaned up", 0, transactionStates.size());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java
deleted file mode 100644
index f4e7908..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.*;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.junit.Test;
-
-public class AMQ2383Test {
-
- @Test
- public void activeMQTest() throws Exception {
- Destination dest = ActiveMQDestination.createDestination("testQueue", ActiveMQDestination.QUEUE_TYPE);
- ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false&broker.persistent=false");
- Connection producerConnection = factory.createConnection();
- producerConnection.start();
- Connection consumerConnection = factory.createConnection();
- consumerConnection.start();
-
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(dest);
- TextMessage sentMsg = producerSession.createTextMessage("test...");
- producer.send(sentMsg);
- producerSession.close();
-
- Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = consumerSession.createConsumer(dest);
- TextMessage receivedMsg = (TextMessage) consumer.receive();
- consumerSession.rollback();
- consumerSession.close();
-
- assertEquals(sentMsg, receivedMsg);
-
- producerConnection.close();
- consumerConnection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java
deleted file mode 100644
index edd4e8f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An AMQ-2401 Test
- */
-public class AMQ2401Test extends TestCase implements MessageListener {
-
- private BrokerService broker;
- private ActiveMQConnectionFactory factory;
- private static final int SEND_COUNT = 500;
- private static final int CONSUMER_COUNT = 50;
- private static final int PRODUCER_COUNT = 1;
- private static final int LOG_INTERVAL = 10;
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2401Test.class);
-
- private final ArrayList<Service> services = new ArrayList<>(CONSUMER_COUNT + PRODUCER_COUNT);
- private int count = 0;
- private CountDownLatch latch;
-
- @Override
- protected void setUp() throws Exception {
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test");
- broker.setDeleteAllMessagesOnStartup(true);
- String connectionUri = broker.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
- PolicyMap policies = new PolicyMap();
- PolicyEntry entry = new PolicyEntry();
- entry.setMemoryLimit(1024 * 100);
- entry.setProducerFlowControl(true);
- entry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
- entry.setQueue(">");
- policies.setDefaultEntry(entry);
- broker.setDestinationPolicy(policies);
- broker.setUseJmx(false);
- broker.start();
- broker.waitUntilStarted();
-
- factory = new ActiveMQConnectionFactory(connectionUri);
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- public void testDupsOk() throws Exception {
-
- latch = new CountDownLatch(SEND_COUNT);
-
- for (int i = 0; i < CONSUMER_COUNT; i++) {
- TestConsumer consumer = new TestConsumer();
- consumer.start();
- services.add(consumer);
- }
- for (int i = 0; i < PRODUCER_COUNT; i++) {
- TestProducer producer = new TestProducer();
- producer.start();
- services.add(producer);
- }
-
- waitForMessageReceipt(TimeUnit.SECONDS.toMillis(30));
- }
-
- @Override
- public void onMessage(Message message) {
- latch.countDown();
- if (++count % LOG_INTERVAL == 0) {
- LOG.debug("Received message " + count);
- }
-
- try {
- Thread.sleep(1);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * @throws InterruptedException
- * @throws TimeoutException
- */
- private void waitForMessageReceipt(long timeout) throws InterruptedException, TimeoutException {
- if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
- throw new TimeoutException(String.format("Consumner didn't receive expected # of messages, %d of %d received.", latch.getCount(), SEND_COUNT));
- }
- }
-
- private interface Service {
-
- public void start() throws Exception;
-
- public void close();
- }
-
- private class TestProducer implements Runnable, Service {
-
- Thread thread;
- BytesMessage message;
-
- Connection connection;
- Session session;
- MessageProducer producer;
-
- TestProducer() throws Exception {
- thread = new Thread(this, "TestProducer");
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- producer = session.createProducer(session.createQueue("AMQ2401Test"));
- }
-
- @Override
- public void start() {
- thread.start();
- }
-
- @Override
- public void run() {
-
- int count = SEND_COUNT / PRODUCER_COUNT;
- for (int i = 1; i <= count; i++) {
- try {
- if ((i % LOG_INTERVAL) == 0) {
- LOG.debug("Sending: " + i);
- }
- message = session.createBytesMessage();
- message.writeBytes(new byte[1024]);
- producer.send(message);
- }
- catch (JMSException jmse) {
- jmse.printStackTrace();
- break;
- }
- }
- }
-
- @Override
- public void close() {
- try {
- connection.close();
- }
- catch (JMSException e) {
- }
- }
- }
-
- private class TestConsumer implements Runnable, Service {
-
- ActiveMQConnection connection;
- Session session;
- MessageConsumer consumer;
-
- TestConsumer() throws Exception {
- factory.setOptimizeAcknowledge(false);
- connection = (ActiveMQConnection) factory.createConnection();
-
- session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- consumer = session.createConsumer(session.createQueue("AMQ2401Test"));
-
- consumer.setMessageListener(AMQ2401Test.this);
- }
-
- @Override
- public void start() throws Exception {
- connection.start();
- }
-
- @Override
- public void close() {
- try {
- connection.close();
- }
- catch (JMSException e) {
- }
- }
-
- @Override
- public void run() {
- while (latch.getCount() > 0) {
- try {
- onMessage(consumer.receive());
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java
deleted file mode 100644
index ed1af90..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Vector;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.CombinationTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ2413Test extends CombinationTestSupport implements MessageListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2413Test.class);
- BrokerService broker;
- private ActiveMQConnectionFactory factory;
-
- private static final int HANG_THRESHOLD = 60;
- private static final int SEND_COUNT = 1000;
- private static final int RECEIVER_THINK_TIME = 1;
- private static final int CONSUMER_COUNT = 1;
- private static final int PRODUCER_COUNT = 50;
- private static final int TO_SEND = SEND_COUNT / PRODUCER_COUNT;
-
- public int deliveryMode = DeliveryMode.NON_PERSISTENT;
- public int ackMode = Session.DUPS_OK_ACKNOWLEDGE;
- public boolean useVMCursor = false;
- public boolean useOptimizeAcks = false;
-
- private final ArrayList<Service> services = new ArrayList<>(CONSUMER_COUNT + PRODUCER_COUNT);
- AtomicInteger count = new AtomicInteger(0);
- Semaphore receivedMessages;
- AtomicBoolean running = new AtomicBoolean(false);
-
- public void initCombos() {
- addCombinationValues("deliveryMode", new Object[]{DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT});
- addCombinationValues("ackMode", new Object[]{Session.DUPS_OK_ACKNOWLEDGE, Session.AUTO_ACKNOWLEDGE});
- addCombinationValues("useVMCursor", new Object[]{true, false});
- // addCombinationValues("useOptimizeAcks", new Object[] {true, false});
- }
-
- @Override
- protected void setUp() throws Exception {
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test");
- broker.setDeleteAllMessagesOnStartup(true);
-
- KahaDBPersistenceAdapter kahaDb = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
- kahaDb.setConcurrentStoreAndDispatchQueues(false);
- broker.addConnector("tcp://0.0.0.0:2401");
- PolicyMap policies = new PolicyMap();
- PolicyEntry entry = new PolicyEntry();
- entry.setMemoryLimit(1024 * 1024);
- entry.setProducerFlowControl(true);
- if (useVMCursor) {
- entry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
- }
- entry.setQueue(">");
- policies.setDefaultEntry(entry);
- broker.setDestinationPolicy(policies);
- broker.start();
- broker.waitUntilStarted();
-
- count.set(0);
- receivedMessages = new Semaphore(0);
-
- factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2401");
- // factory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false&broker.persistent=false");
- setAutoFail(true);
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- running.set(false);
- for (Service service : services) {
- service.close();
- }
-
- broker.stop();
- broker.waitUntilStopped();
-
- super.tearDown();
- }
-
- public void testReceipt() throws Exception {
-
- running.set(true);
-
- for (int i = 0; i < CONSUMER_COUNT; i++) {
- TestConsumer consumer = new TestConsumer();
- consumer.start();
- services.add(consumer);
- }
- for (int i = 0; i < PRODUCER_COUNT; i++) {
- TestProducer producer = new TestProducer(i);
- producer.start();
- services.add(producer);
- }
-
- waitForMessageReceipt();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
- */
- @Override
- public void onMessage(Message message) {
- receivedMessages.release();
- if (count.incrementAndGet() % 100 == 0) {
- LOG.info("Received message " + count);
- }
- track(message);
- if (RECEIVER_THINK_TIME > 0) {
- try {
- Thread.sleep(RECEIVER_THINK_TIME);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- }
-
- HashMap<ProducerId, boolean[]> tracker = new HashMap<>();
-
- private synchronized void track(Message message) {
- try {
- MessageId id = new MessageId(message.getJMSMessageID());
- ProducerId pid = id.getProducerId();
- int seq = (int) id.getProducerSequenceId();
- boolean[] ids = tracker.get(pid);
- if (ids == null) {
- ids = new boolean[TO_SEND + 1];
- ids[seq] = true;
- tracker.put(pid, ids);
- }
- else {
- assertTrue("not already received: " + id, !ids[seq]);
- ids[seq] = true;
- }
- }
- catch (Exception e) {
- LOG.error(e.toString());
- }
- }
-
- /**
- * @throws InterruptedException
- * @throws TimeoutException
- */
- private void waitForMessageReceipt() throws InterruptedException, TimeoutException {
- try {
- while (count.get() < SEND_COUNT) {
- if (!receivedMessages.tryAcquire(HANG_THRESHOLD, TimeUnit.SECONDS)) {
- if (count.get() == SEND_COUNT)
- break;
- verifyTracking();
- throw new TimeoutException("@count=" + count.get() + " Message not received for more than " + HANG_THRESHOLD + " seconds");
- }
- }
- }
- finally {
- running.set(false);
- }
- }
-
- private void verifyTracking() {
- Vector<MessageId> missing = new Vector<>();
- for (ProducerId pid : tracker.keySet()) {
- boolean[] ids = tracker.get(pid);
- for (int i = 1; i < TO_SEND + 1; i++) {
- if (!ids[i]) {
- missing.add(new MessageId(pid, i));
- }
- }
- }
- assertTrue("No missing messages: " + missing, missing.isEmpty());
- }
-
- private interface Service {
-
- public void start() throws Exception;
-
- public void close();
- }
-
- private class TestProducer implements Runnable, Service {
-
- Thread thread;
- BytesMessage message;
- Connection connection;
- Session session;
- MessageProducer producer;
-
- TestProducer(int id) throws Exception {
- thread = new Thread(this, "TestProducer-" + id);
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- producer = session.createProducer(session.createQueue("AMQ2401Test"));
- }
-
- @Override
- public void start() {
- thread.start();
- }
-
- @Override
- public void run() {
-
- int i = 1;
- for (; i <= TO_SEND; i++) {
- try {
-
- if (+i % 100 == 0) {
- LOG.info(Thread.currentThread().getName() + " Sending message " + i);
- }
- message = session.createBytesMessage();
- message.writeBytes(new byte[1024]);
- producer.setDeliveryMode(deliveryMode);
- producer.send(message);
- }
- catch (JMSException jmse) {
- jmse.printStackTrace();
- break;
- }
- }
- LOG.info(Thread.currentThread().getName() + " Sent: " + (i - 1));
- }
-
- @Override
- public void close() {
- try {
- connection.close();
- }
- catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
-
- private class TestConsumer implements Runnable, Service {
-
- ActiveMQConnection connection;
- Session session;
- MessageConsumer consumer;
-
- TestConsumer() throws Exception {
- factory.setOptimizeAcknowledge(false);
- connection = (ActiveMQConnection) factory.createConnection();
- if (useOptimizeAcks) {
- connection.setOptimizeAcknowledge(true);
- }
-
- session = connection.createSession(false, ackMode);
- consumer = session.createConsumer(session.createQueue("AMQ2401Test"));
-
- consumer.setMessageListener(AMQ2413Test.this);
- }
-
- @Override
- public void start() throws Exception {
- connection.start();
- }
-
- @Override
- public void close() {
- try {
- connection.close();
- }
- catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- while (running.get()) {
- try {
- onMessage(consumer.receive());
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- public static Test suite() {
- return suite(AMQ2413Test.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
deleted file mode 100644
index f4fb8a2..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ2439Test extends JmsMultipleBrokersTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2439Test.class);
- Destination dest;
-
- public void testDuplicatesThroughNetwork() throws Exception {
- assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500));
- assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500));
- validateQueueStats();
- }
-
- private void validateQueueStats() throws Exception {
- final BrokerView brokerView = brokers.get("BrokerA").broker.getAdminView();
- assertEquals("enequeue is correct", 1000, brokerView.getTotalEnqueueCount());
-
- assertTrue("dequeue is correct", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("dequeue count (want 1000), is : " + brokerView.getTotalDequeueCount());
- return 1000 == brokerView.getTotalDequeueCount();
- }
- }));
- }
-
- protected int receiveExactMessages(String brokerName, int msgCount) throws Exception {
-
- BrokerItem brokerItem = brokers.get(brokerName);
- Connection connection = brokerItem.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(dest);
-
- Message msg;
- int i;
- for (i = 0; i < msgCount; i++) {
- msg = consumer.receive(1000);
- if (msg == null) {
- break;
- }
- }
-
- connection.close();
- brokerItem.connections.remove(connection);
-
- return i;
- }
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
- createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=true&deleteAllMessagesOnStartup=true&advisorySupport=false"));
- createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=true&deleteAllMessagesOnStartup=true&useJmx=false"));
- bridgeBrokers("BrokerA", "BrokerB");
-
- startAllBrokers();
-
- // Create queue
- dest = createDestination("TEST.FOO", false);
- sendMessages("BrokerA", dest, 1000);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java
deleted file mode 100644
index bcd2db1..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.command.ActiveMQQueue;
-
-/**
- * In CLIENT_ACKNOWLEDGE and INDIVIDUAL_ACKNOWLEDGE modes following exception
- * occurs when ASYNCH consumers acknowledges messages in not in order they
- * received the messages.
- * <p>
- * Exception thrown on broker side:
- * <p>
- * {@code javax.jms.JMSException: Could not correlate acknowledgment with
- * dispatched message: MessageAck}
- *
- * @author daroo
- */
-public class AMQ2489Test extends TestSupport {
-
- private final static String SEQ_NUM_PROPERTY = "seqNum";
-
- private final static int TOTAL_MESSAGES_CNT = 2;
- private final static int CONSUMERS_CNT = 2;
-
- private final CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
-
- private Connection connection;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- connection = createConnection();
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- super.tearDown();
- }
-
- public void testUnorderedClientAcknowledge() throws Exception {
- doUnorderedAck(Session.CLIENT_ACKNOWLEDGE);
- }
-
- public void testUnorderedIndividualAcknowledge() throws Exception {
- doUnorderedAck(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
- }
-
- /**
- * Main test method
- *
- * @param acknowledgmentMode - ACK mode to be used by consumers
- * @throws Exception
- */
- protected void doUnorderedAck(int acknowledgmentMode) throws Exception {
- List<Consumer> consumers = null;
- Session producerSession = null;
-
- connection.start();
- // Because exception is thrown on broker side only, let's set up
- // exception listener to get it
- final TestExceptionListener exceptionListener = new TestExceptionListener();
- connection.setExceptionListener(exceptionListener);
- try {
- consumers = new ArrayList<>();
- // start customers
- for (int i = 0; i < CONSUMERS_CNT; i++) {
- consumers.add(new Consumer(acknowledgmentMode));
- }
-
- // produce few test messages
- producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageProducer producer = producerSession.createProducer(new ActiveMQQueue(getQueueName()));
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < TOTAL_MESSAGES_CNT; i++) {
- final Message message = producerSession.createTextMessage("test");
- // assign each message sequence number
- message.setIntProperty(SEQ_NUM_PROPERTY, i);
- producer.send(message);
- }
-
- // during each onMessage() calls consumers decreases the LATCH
- // counter.
- //
- // so, let's wait till all messages are consumed.
- //
- LATCH.await();
-
- // wait a bit more to give exception listener a chance be populated
- // with
- // broker's error
- TimeUnit.SECONDS.sleep(1);
-
- assertFalse(exceptionListener.getStatusText(), exceptionListener.hasExceptions());
-
- }
- finally {
- if (producerSession != null)
- producerSession.close();
-
- if (consumers != null) {
- for (Consumer c : consumers) {
- c.close();
- }
- }
- }
- }
-
- protected String getQueueName() {
- return getClass().getName() + "." + getName();
- }
-
- public final class Consumer implements MessageListener {
-
- final Session session;
-
- private Consumer(int acknowledgmentMode) {
- try {
- session = connection.createSession(false, acknowledgmentMode);
- final Queue queue = session.createQueue(getQueueName() + "?consumer.prefetchSize=1");
- final MessageConsumer consumer = session.createConsumer(queue);
- consumer.setMessageListener(this);
- }
- catch (JMSException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void onMessage(Message message) {
- try {
- // retrieve sequence number assigned by producer...
- final int seqNum = message.getIntProperty(SEQ_NUM_PROPERTY);
-
- // ...and let's delay every second message a little bit before
- // acknowledgment
- if ((seqNum % 2) == 0) {
- System.out.println("Delayed message sequence numeber: " + seqNum);
- try {
- TimeUnit.SECONDS.sleep(1);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- message.acknowledge();
- }
- catch (JMSException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- finally {
- // decrease LATCH counter in the main test method.
- LATCH.countDown();
- }
- }
-
- private void close() {
- if (session != null) {
- try {
- session.close();
- }
- catch (JMSException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- public final class TestExceptionListener implements ExceptionListener {
-
- private final java.util.Queue<Exception> exceptions = new ConcurrentLinkedQueue<>();
-
- @Override
- public void onException(JMSException e) {
- exceptions.add(e);
- }
-
- public boolean hasExceptions() {
- return exceptions.isEmpty() == false;
- }
-
- public String getStatusText() {
- final StringBuilder str = new StringBuilder();
- str.append("Exceptions count on broker side: " + exceptions.size() + ".\nMessages:\n");
- for (Exception e : exceptions) {
- str.append(e.getMessage() + "\n\n");
- }
- return str.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
deleted file mode 100644
index b18a7b4..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.apache.activemq.util.IOHelper;
-
-public class AMQ2512Test extends EmbeddedBrokerTestSupport {
-
- private static Connection connection;
- private final static String QUEUE_NAME = "dee.q";
- private final static int INITIAL_MESSAGES_CNT = 1000;
- private final static int WORKER_INTERNAL_ITERATIONS = 100;
- private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS + INITIAL_MESSAGES_CNT;
- private final static byte[] payload = new byte[5 * 1024];
- private final static String TEXT = new String(payload);
-
- private final static String PRP_INITIAL_ID = "initial-id";
- private final static String PRP_WORKER_ID = "worker-id";
-
- private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
-
- private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger();
-
- public void testKahaDBFailure() throws Exception {
- final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress);
- connection = fac.createConnection();
- final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue queue = session.createQueue(QUEUE_NAME);
- final MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- connection.start();
-
- final long startTime = System.nanoTime();
-
- final List<Consumer> consumers = new ArrayList<>();
- for (int i = 0; i < 20; i++) {
- consumers.add(new Consumer("worker-" + i));
- }
-
- for (int i = 0; i < INITIAL_MESSAGES_CNT; i++) {
- final TextMessage msg = session.createTextMessage(TEXT);
- msg.setStringProperty(PRP_INITIAL_ID, "initial-" + i);
- producer.send(msg);
- }
-
- LATCH.await();
- final long endTime = System.nanoTime();
- System.out.println("Total execution time = " + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms].");
- System.out.println("Rate = " + TOTAL_MESSAGES_CNT / TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s].");
-
- for (Consumer c : consumers) {
- c.close();
- }
- connection.close();
- }
-
- private final static class Consumer implements MessageListener {
-
- private final String name;
- private final Session session;
- private final MessageProducer producer;
-
- private Consumer(String name) {
- this.name = name;
- try {
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final Queue queue = session.createQueue(QUEUE_NAME + "?consumer.prefetchSize=10");
- producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- final MessageConsumer consumer = session.createConsumer(queue);
- consumer.setMessageListener(this);
- }
- catch (JMSException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void onMessage(Message message) {
- final TextMessage msg = (TextMessage) message;
- try {
- if (!msg.propertyExists(PRP_WORKER_ID)) {
- for (int i = 0; i < WORKER_INTERNAL_ITERATIONS; i++) {
- final TextMessage newMsg = session.createTextMessage(msg.getText());
- newMsg.setStringProperty(PRP_WORKER_ID, name + "-" + i);
- newMsg.setStringProperty(PRP_INITIAL_ID, msg.getStringProperty(PRP_INITIAL_ID));
- producer.send(newMsg);
- }
- }
- msg.acknowledge();
-
- }
- catch (JMSException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- finally {
- final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement();
- if (onMsgCounter % 1000 == 0) {
- System.out.println("message received: " + onMsgCounter);
- }
- LATCH.countDown();
- }
- }
-
- private void close() {
- if (session != null) {
- try {
- session.close();
- }
- catch (JMSException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- bindAddress = "tcp://0.0.0.0:61617";
- super.setUp();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- File dataFileDir = new File("target/test-amq-2512/datadb");
- IOHelper.mkdirs(dataFileDir);
- IOHelper.deleteChildren(dataFileDir);
- KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(dataFileDir);
- BrokerService answer = new BrokerService();
- answer.setPersistenceAdapter(kaha);
-
- kaha.setEnableJournalDiskSyncs(false);
- //kaha.setIndexCacheSize(10);
- answer.setDataDirectoryFile(dataFileDir);
- answer.setUseJmx(false);
- answer.addConnector(bindAddress);
- return answer;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java
deleted file mode 100644
index eb25bdd..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.ObjectName;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.broker.jmx.ManagementContext;
-
-/**
- * This unit test verifies an issue when
- * javax.management.InstanceNotFoundException is thrown after subsequent startups when
- * managementContext createConnector="false"
- */
-public class AMQ2513Test extends TestCase {
-
- private BrokerService broker;
- private String connectionUri;
-
- void createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- broker = new BrokerService();
- broker.setBrokerName("localhost");
- broker.setUseJmx(true);
- broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
- broker.addConnector("tcp://localhost:0");
-
- ManagementContext ctx = new ManagementContext();
- //if createConnector == true everything is fine
- ctx.setCreateConnector(false);
- broker.setManagementContext(ctx);
-
- broker.start();
- broker.waitUntilStarted();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- public void testJmx() throws Exception {
- createBroker(true);
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- Connection connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(session.createQueue("test"));
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- connection.start();
-
- producer.send(session.createTextMessage("test123"));
-
- DestinationViewMBean dv = createView();
- assertTrue(dv.getQueueSize() > 0);
-
- connection.close();
-
- broker.stop();
- broker.waitUntilStopped();
-
- createBroker(false);
- factory = new ActiveMQConnectionFactory(connectionUri);
- connection = factory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(session.createQueue("test"));
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- connection.start();
- producer.send(session.createTextMessage("test123"));
- connection.close();
-
- dv = createView();
- assertTrue(dv.getQueueSize() > 0);
-
- broker.stop();
- broker.waitUntilStopped();
-
- }
-
- DestinationViewMBean createView() throws Exception {
- String domain = "org.apache.activemq";
- ObjectName name = new ObjectName(domain + ":type=Broker,brokerName=localhost," +
- "destinationType=Queue,destinationName=test");
- return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java
deleted file mode 100644
index 148ab32..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.region.Queue;
-import org.junit.Assert;
-
-/**
- * This test demonstrates a bug in which calling
- * Queue#removeMatchingMessages("") generates an exception, whereas the JMS
- * specification states that an empty selector is valid.
- */
-public class AMQ2528Test extends EmbeddedBrokerTestSupport {
-
- /**
- * Setup the test so that the destination is a queue.
- */
- @Override
- protected void setUp() throws Exception {
- useTopic = false;
- super.setUp();
- }
-
- /**
- * This test enqueues test messages to destination and then verifies that
- * {@link Queue#removeMatchingMessages("")} removes all the messages.
- */
- public void testRemoveMatchingMessages() throws Exception {
- final int NUM_MESSAGES = 100;
- final String MESSAGE_ID = "id";
-
- // Enqueue the test messages.
- Connection conn = createConnection();
- try {
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- for (int id = 0; id < NUM_MESSAGES; id++) {
- Message message = session.createMessage();
- message.setIntProperty(MESSAGE_ID, id);
- producer.send(message);
- }
- producer.close();
- session.close();
- }
- finally {
- conn.close();
- }
-
- // Verify that half of the messages can be removed by selector.
- Queue queue = (Queue) broker.getRegionBroker().getDestinations(destination).iterator().next();
-
- Assert.assertEquals(NUM_MESSAGES / 2, queue.removeMatchingMessages(MESSAGE_ID + " < " + NUM_MESSAGES / 2));
-
- // Verify that the remainder of the messages can be removed by empty
- // selector.
- Assert.assertEquals(NUM_MESSAGES - NUM_MESSAGES / 2, queue.removeMatchingMessages(""));
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java
deleted file mode 100644
index 0c3ef45..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-
-public class AMQ2571Test extends EmbeddedBrokerTestSupport {
-
- public void testTempQueueClosing() {
- try {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.bindAddress);
- connectionFactory.setAlwaysSyncSend(true);
-
- // First create session that will own the TempQueue
- Connection connectionA = connectionFactory.createConnection();
- connectionA.start();
-
- Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- TemporaryQueue tempQueue = sessionA.createTemporaryQueue();
-
- // Next, create session that will put messages on the queue.
- Connection connectionB = connectionFactory.createConnection();
- connectionB.start();
-
- Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Create a producer for connection B.
- final MessageProducer producerB = sessionB.createProducer(tempQueue);
- producerB.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- final TextMessage message = sessionB.createTextMessage("Testing AMQ TempQueue.");
-
- Thread sendingThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- long end = System.currentTimeMillis() + 5 * 60 * 1000;
- // wait for exception on send
- while (System.currentTimeMillis() < end) {
- producerB.send(message);
- }
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
-
- // Send 5000 messages.
- sendingThread.start();
- // Now close connection A. This will remove the TempQueue.
- connectionA.close();
- // Wait for the thread to finish.
- sendingThread.join(5 * 60 * 1000);
-
- // Sleep for a while to make sure that we should know that the
- // TempQueue is gone.
- //Thread.sleep(50);
-
- // Now we test if we are able to send again.
- try {
- producerB.send(message);
- fail("Involuntary recreated temporary queue.");
- }
- catch (JMSException e) {
- // Got exception, just as we wanted because the creator of
- // the TempQueue had closed the connection prior to the send.
- assertTrue("TempQueue does not exist anymore.", true);
- }
- }
- catch (Exception e) {
- fail("Unexpected exception " + e);
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- bindAddress = "vm://localhost";
- setAutoFail(true);
- super.setUp();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setPersistent(false);
- answer.setUseJmx(false);
- return answer;
- }
-}
\ No newline at end of file