You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:30 UTC
[21/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
deleted file mode 100644
index 3504c1f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.disk.journal.DataFile;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4212Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4212Test.class);
-
- private BrokerService service;
- private String connectionUri;
- private ActiveMQConnectionFactory cf;
-
- private final int MSG_COUNT = 256;
-
- @Before
- public void setUp() throws IOException, Exception {
- createBroker(true, false);
- }
-
- public void createBroker(boolean deleteAllMessages, boolean recover) throws Exception {
- service = new BrokerService();
- service.setBrokerName("InactiveSubTest");
- service.setDeleteAllMessagesOnStartup(deleteAllMessages);
- service.setAdvisorySupport(false);
- service.setPersistent(true);
- service.setUseJmx(true);
- service.setKeepDurableSubsActive(false);
-
- KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter();
- File dataFile = new File("KahaDB");
- pa.setDirectory(dataFile);
- pa.setJournalMaxFileLength(10 * 1024);
- pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5));
- pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5));
- pa.setForceRecoverIndex(recover);
-
- service.setPersistenceAdapter(pa);
- service.start();
- service.waitUntilStarted();
-
- connectionUri = "vm://InactiveSubTest?create=false";
- cf = new ActiveMQConnectionFactory(connectionUri);
- }
-
- private void restartBroker() throws Exception {
- stopBroker();
- createBroker(false, false);
- }
-
- private void recoverBroker() throws Exception {
- stopBroker();
- createBroker(false, true);
- }
-
- @After
- public void stopBroker() throws Exception {
- if (service != null) {
- service.stop();
- service.waitUntilStopped();
- service = null;
- }
- }
-
- @Test
- public void testDirableSubPrefetchRecovered() throws Exception {
-
- ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
- ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
-
- // Send to a Queue to create some journal files
- sendMessages(queue);
-
- LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
-
- createInactiveDurableSub(topic);
-
- assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
- return subs != null && subs.length == 1 ? true : false;
- }
- }));
-
- // Now send some more to the queue to create even more files.
- sendMessages(queue);
-
- LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
- assertTrue(getNumberOfJournalFiles() > 1);
-
- LOG.info("Restarting the broker.");
- restartBroker();
- LOG.info("Restarted the broker.");
-
- LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
- assertTrue(getNumberOfJournalFiles() > 1);
-
- assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
- return subs != null && subs.length == 1 ? true : false;
- }
- }));
-
- // Clear out all queue data
- service.getAdminView().removeQueue(queue.getQueueName());
-
- assertTrue("Less than two journal files expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return getNumberOfJournalFiles() <= 2;
- }
- }, TimeUnit.MINUTES.toMillis(2)));
-
- LOG.info("Sending {} Messages to the Topic.", MSG_COUNT);
- // Send some messages to the inactive destination
- sendMessages(topic);
-
- LOG.info("Attempt to consume {} messages from the Topic.", MSG_COUNT);
- assertEquals(MSG_COUNT, consumeFromInactiveDurableSub(topic));
-
- LOG.info("Recovering the broker.");
- recoverBroker();
- LOG.info("Recovering the broker.");
-
- assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
- return subs != null && subs.length == 1 ? true : false;
- }
- }));
- }
-
- @Test
- public void testDurableAcksNotDropped() throws Exception {
-
- ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
- ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
-
- // Create durable sub in first data file.
- createInactiveDurableSub(topic);
-
- assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
- return subs != null && subs.length == 1 ? true : false;
- }
- }));
-
- // Send to a Topic
- sendMessages(topic, 1);
-
- // Send to a Queue to create some journal files
- sendMessages(queue);
-
- LOG.info("Before consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
-
- // Consume all the Messages leaving acks behind.
- consumeDurableMessages(topic, 1);
-
- LOG.info("After consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
-
- // Now send some more to the queue to create even more files.
- sendMessages(queue);
-
- LOG.info("More Queued. There are currently [{}] journal log files.", getNumberOfJournalFiles());
- assertTrue(getNumberOfJournalFiles() > 1);
-
- LOG.info("Restarting the broker.");
- restartBroker();
- LOG.info("Restarted the broker.");
-
- LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
- assertTrue(getNumberOfJournalFiles() > 1);
-
- assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
- return subs != null && subs.length == 1 ? true : false;
- }
- }));
-
- // Clear out all queue data
- service.getAdminView().removeQueue(queue.getQueueName());
-
- assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return getNumberOfJournalFiles() <= 3;
- }
- }, TimeUnit.MINUTES.toMillis(3)));
-
- // See if we receive any message they should all be acked.
- tryConsumeExpectNone(topic);
-
- LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
-
- LOG.info("Recovering the broker.");
- recoverBroker();
- LOG.info("Recovering the broker.");
-
- LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
-
- assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
- return subs != null && subs.length == 1 ? true : false;
- }
- }));
-
- // See if we receive any message they should all be acked.
- tryConsumeExpectNone(topic);
-
- assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return getNumberOfJournalFiles() == 1;
- }
- }, TimeUnit.MINUTES.toMillis(1)));
- }
-
- private int getNumberOfJournalFiles() throws IOException {
- Collection<DataFile> files = ((KahaDBPersistenceAdapter) service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
- int reality = 0;
- for (DataFile file : files) {
- if (file != null) {
- reality++;
- }
- }
-
- return reality;
- }
-
- private void createInactiveDurableSub(Topic topic) throws Exception {
- Connection connection = cf.createConnection();
- connection.setClientID("Inactive");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
- consumer.close();
- connection.close();
- }
-
- private void consumeDurableMessages(Topic topic, int count) throws Exception {
- Connection connection = cf.createConnection();
- connection.setClientID("Inactive");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
- connection.start();
- for (int i = 0; i < count; ++i) {
- if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) == null) {
- fail("should have received a message");
- }
- }
- consumer.close();
- connection.close();
- }
-
- private void tryConsumeExpectNone(Topic topic) throws Exception {
- Connection connection = cf.createConnection();
- connection.setClientID("Inactive");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
- connection.start();
- if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) != null) {
- fail("Should be no messages for this durable.");
- }
- consumer.close();
- connection.close();
- }
-
- private int consumeFromInactiveDurableSub(Topic topic) throws Exception {
- Connection connection = cf.createConnection();
- connection.setClientID("Inactive");
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
-
- int count = 0;
-
- while (consumer.receive(10000) != null) {
- count++;
- }
-
- consumer.close();
- connection.close();
-
- return count;
- }
-
- private void sendMessages(Destination destination) throws Exception {
- sendMessages(destination, MSG_COUNT);
- }
-
- private void sendMessages(Destination destination, int count) throws Exception {
- Connection connection = cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < count; ++i) {
- TextMessage message = session.createTextMessage("Message #" + i + " for destination: " + destination);
- producer.send(message);
- }
- connection.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java
deleted file mode 100644
index c033e97..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.fail;
-
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ProducerInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4213Test {
-
- private static BrokerService brokerService;
- private static String BROKER_ADDRESS = "tcp://localhost:0";
- private static String TEST_QUEUE = "testQueue";
- private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);
-
- private String connectionUri;
-
- @SuppressWarnings("unchecked")
- @Before
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.setUseJmx(true);
- brokerService.setDeleteAllMessagesOnStartup(true);
- connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
-
- brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-
- @Override
- public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
- throw new javax.jms.JMSSecurityException(connectionUri);
- }
- }});
-
- brokerService.start();
- brokerService.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- @Test
- public void testExceptionOnProducerCreateThrows() throws Exception {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- connection.start();
-
- try {
- session.createProducer(queue);
- fail("Should not be able to create this producer.");
- }
- catch (JMSException ex) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java
deleted file mode 100644
index 7433b18..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4220Test {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ4220Test.class);
- private final static int maxFileLength = 1024 * 1024 * 32;
- private final static String destinationName = "TEST.QUEUE";
- BrokerService broker;
-
- @Before
- public void setUp() throws Exception {
- prepareBrokerWithMultiStore(true);
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-
- protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
- BrokerService broker = new BrokerService();
- broker.setUseJmx(true);
- broker.setBrokerName("localhost");
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- @Test
- public void testRestartAfterQueueDelete() throws Exception {
-
- // Ensure we have an Admin View.
- assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return (broker.getAdminView()) != null;
- }
- }));
-
- LOG.info("Adding initial destination: {}", destinationName);
-
- broker.getAdminView().addQueue(destinationName);
-
- assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName)));
-
- LOG.info("Removing initial destination: {}", destinationName);
-
- broker.getAdminView().removeQueue(destinationName);
-
- LOG.info("Adding back destination: {}", destinationName);
-
- broker.getAdminView().addQueue(destinationName);
-
- assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName)));
- }
-
- protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
- KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
- kaha.setJournalMaxFileLength(maxFileLength);
- kaha.setCleanupInterval(5000);
- if (delete) {
- kaha.deleteAllMessages();
- }
- return kaha;
- }
-
- public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
-
- MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
- if (deleteAllMessages) {
- multiKahaDBPersistenceAdapter.deleteAllMessages();
- }
- ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-
- FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
- template.setPersistenceAdapter(createStore(deleteAllMessages));
- template.setPerDestination(true);
- adapters.add(template);
-
- multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
- broker = createBroker(multiKahaDBPersistenceAdapter);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java
deleted file mode 100644
index 0e9c488..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.HashSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.DestinationStatistics;
-import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
-import org.apache.activemq.util.DefaultTestAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.spi.LoggingEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4221Test extends TestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4221Test.class);
- public int PAYLOAD_SIZE_BYTES = 4 * 1024;
- public int NUM_TO_SEND = 60000;
- public int NUM_CONCURRENT_PRODUCERS = 20;
- public int QUEUE_COUNT = 1;
- public int TMP_JOURNAL_MAX_FILE_SIZE = 10 * 1024 * 1024;
-
- public int DLQ_PURGE_INTERVAL = 30000;
-
- public int MESSAGE_TIME_TO_LIVE = 20000;
- public int EXPIRE_SWEEP_PERIOD = 200;
- public int TMP_JOURNAL_GC_PERIOD = 50;
- public int RECEIVE_POLL_PERIOD = 4000;
- private int RECEIVE_BATCH = 5000;
-
- final byte[] payload = new byte[PAYLOAD_SIZE_BYTES];
- final AtomicInteger counter = new AtomicInteger(0);
- final HashSet<Throwable> exceptions = new HashSet<>();
- BrokerService brokerService;
- private String brokerUrlString;
- ExecutorService executorService = Executors.newCachedThreadPool();
- final AtomicBoolean done = new AtomicBoolean(false);
-
- public static Test suite() {
- return suite(AMQ4221Test.class);
- }
-
- @Override
- public void setUp() throws Exception {
-
- LogManager.getRootLogger().addAppender(new DefaultTestAppender() {
-
- @Override
- public void doAppend(LoggingEvent event) {
- if (event.getLevel().isGreaterOrEqual(Level.ERROR)) {
- System.err.println("exit on error: " + event.getMessage());
- done.set(true);
- new Thread() {
- @Override
- public void run() {
- System.exit(787);
- }
- }.start();
- }
- }
- });
-
- done.set(false);
- brokerService = new BrokerService();
- brokerService.setDeleteAllMessagesOnStartup(true);
- brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue("ActiveMQ.DLQ")});
-
- PolicyEntry defaultPolicy = new PolicyEntry();
- defaultPolicy.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
- defaultPolicy.setExpireMessagesPeriod(EXPIRE_SWEEP_PERIOD);
- defaultPolicy.setProducerFlowControl(false);
- defaultPolicy.setMemoryLimit(50 * 1024 * 1024);
-
- brokerService.getSystemUsage().getMemoryUsage().setLimit(50 * 1024 * 1024);
-
- PolicyMap destinationPolicyMap = new PolicyMap();
- destinationPolicyMap.setDefaultEntry(defaultPolicy);
- brokerService.setDestinationPolicy(destinationPolicyMap);
-
- PListStoreImpl tempDataStore = new PListStoreImpl();
- tempDataStore.setDirectory(brokerService.getTmpDataDirectory());
- tempDataStore.setJournalMaxFileLength(TMP_JOURNAL_MAX_FILE_SIZE);
- tempDataStore.setCleanupInterval(TMP_JOURNAL_GC_PERIOD);
- tempDataStore.setIndexPageSize(200);
- tempDataStore.setIndexEnablePageCaching(false);
-
- brokerService.setTempDataStore(tempDataStore);
- brokerService.setAdvisorySupport(false);
- TransportConnector tcp = brokerService.addConnector("tcp://localhost:0");
- brokerService.start();
- brokerUrlString = tcp.getPublishableConnectString();
- }
-
- @Override
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- executorService.shutdownNow();
- }
-
- public void testProduceConsumeExpireHalf() throws Exception {
-
- final org.apache.activemq.broker.region.Queue dlq = (org.apache.activemq.broker.region.Queue) getDestination(brokerService, new ActiveMQQueue("ActiveMQ.DLQ"));
-
- if (DLQ_PURGE_INTERVAL > 0) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- while (!done.get()) {
- try {
- Thread.sleep(DLQ_PURGE_INTERVAL);
- LOG.info("Purge DLQ, current size: " + dlq.getDestinationStatistics().getMessages().getCount());
- dlq.purge();
- }
- catch (InterruptedException allDone) {
- }
- catch (Throwable e) {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- }
- });
-
- }
-
- final CountDownLatch latch = new CountDownLatch(QUEUE_COUNT);
- for (int i = 0; i < QUEUE_COUNT; i++) {
- final int id = i;
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- doProduceConsumeExpireHalf(id, latch);
- }
- catch (Throwable e) {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- });
- }
-
- while (!done.get()) {
- done.set(latch.await(5, TimeUnit.SECONDS));
- }
- executorService.shutdown();
- executorService.awaitTermination(5, TimeUnit.MINUTES);
-
- assertTrue("no exceptions:" + exceptions, exceptions.isEmpty());
-
- }
-
- public void doProduceConsumeExpireHalf(int id, CountDownLatch latch) throws Exception {
-
- final ActiveMQQueue queue = new ActiveMQQueue("Q" + id);
-
- final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlString);
- ActiveMQPrefetchPolicy prefecthPolicy = new ActiveMQPrefetchPolicy();
- prefecthPolicy.setAll(0);
- factory.setPrefetchPolicy(prefecthPolicy);
- Connection connection = factory.createConnection();
- connection.start();
- final MessageConsumer consumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue, "on = 'true'");
-
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- while (!done.get()) {
- Thread.sleep(RECEIVE_POLL_PERIOD);
- for (int i = 0; i < RECEIVE_BATCH && !done.get(); i++) {
-
- Message message = consumer.receive(1000);
- if (message != null) {
- counter.incrementAndGet();
- if (counter.get() > 0 && counter.get() % 500 == 0) {
- LOG.info("received: " + counter.get() + ", " + message.getJMSDestination().toString());
- }
- }
- }
- }
- }
- catch (JMSException ignored) {
-
- }
- catch (Exception e) {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- });
-
- final AtomicInteger accumulator = new AtomicInteger(0);
- final CountDownLatch producersDone = new CountDownLatch(NUM_CONCURRENT_PRODUCERS);
-
- for (int i = 0; i < NUM_CONCURRENT_PRODUCERS; i++) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- Connection sendConnection = factory.createConnection();
- sendConnection.start();
- Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = sendSession.createProducer(queue);
- producer.setTimeToLive(MESSAGE_TIME_TO_LIVE);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- while (accumulator.incrementAndGet() < NUM_TO_SEND && !done.get()) {
- BytesMessage message = sendSession.createBytesMessage();
- message.writeBytes(payload);
- message.setStringProperty("on", String.valueOf(accumulator.get() % 2 == 0));
- producer.send(message);
-
- }
- producersDone.countDown();
- }
- catch (Exception e) {
- e.printStackTrace();
- exceptions.add(e);
- }
- }
- });
- }
-
- producersDone.await(10, TimeUnit.MINUTES);
-
- final DestinationStatistics view = getDestinationStatistics(brokerService, queue);
- LOG.info("total expired so far " + view.getExpired().getCount() + ", " + queue.getQueueName());
- latch.countDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
deleted file mode 100644
index adaf0e5..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.TransportConnection;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.transport.vm.VMTransportFactory;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
- */
-public class AMQ4222Test extends TestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4222Test.class);
-
- protected BrokerService brokerService;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- topic = false;
- brokerService = createBroker();
- }
-
- @Override
- protected void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false"));
- broker.start();
- broker.waitUntilStarted();
- return broker;
- }
-
- @Override
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory("vm://localhost");
- }
-
- public void testTempQueueCleanedUp() throws Exception {
-
- Destination requestQueue = createDestination();
-
- Connection producerConnection = createConnection();
- producerConnection.start();
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer producer = producerSession.createProducer(requestQueue);
- Destination replyTo = producerSession.createTemporaryQueue();
- MessageConsumer producerSessionConsumer = producerSession.createConsumer(replyTo);
-
- final CountDownLatch countDownLatch = new CountDownLatch(1);
- // let's listen to the response on the queue
- producerSessionConsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- if (message instanceof TextMessage) {
- LOG.info("You got a message: " + ((TextMessage) message).getText());
- countDownLatch.countDown();
- }
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
-
- producer.send(createRequest(producerSession, replyTo));
-
- Connection consumerConnection = createConnection();
- consumerConnection.start();
- Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(requestQueue);
- final MessageProducer consumerProducer = consumerSession.createProducer(null);
-
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- consumerProducer.send(message.getJMSReplyTo(), message);
- }
- catch (JMSException e) {
- LOG.error("error sending a response on the temp queue");
- e.printStackTrace();
- }
- }
- });
-
- countDownLatch.await(2, TimeUnit.SECONDS);
-
- // producer has not gone away yet...
- org.apache.activemq.broker.region.Destination tempDestination = getDestination(brokerService, (ActiveMQDestination) replyTo);
- assertNotNull(tempDestination);
-
- // clean up
- producer.close();
- producerSession.close();
- producerConnection.close();
-
- // producer has gone away.. so the temp queue should not exist anymore... let's see..
- // producer has not gone away yet...
- tempDestination = getDestination(brokerService, (ActiveMQDestination) replyTo);
- assertNull(tempDestination);
-
- // now.. the connection on the broker side for the dude producing to the temp dest will
- // still have a reference in his producerBrokerExchange.. this will keep the destination
- // from being reclaimed by GC if there is never another send that producer makes...
- // let's see if that reference is there...
- final TransportConnector connector = VMTransportFactory.CONNECTORS.get("localhost");
- assertNotNull(connector);
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return connector.getConnections().size() == 1;
- }
- }));
- TransportConnection transportConnection = connector.getConnections().get(0);
- Map<ProducerId, ProducerBrokerExchange> exchanges = getProducerExchangeFromConn(transportConnection);
- assertEquals(1, exchanges.size());
- ProducerBrokerExchange exchange = exchanges.values().iterator().next();
-
- // so this is the reason for the test... we don't want these exchanges to hold a reference
- // to a region destination.. after a send is completed, the destination is not used anymore on
- // a producer exchange
- assertNull(exchange.getRegionDestination());
- assertNull(exchange.getRegion());
-
- }
-
- @SuppressWarnings("unchecked")
- private Map<ProducerId, ProducerBrokerExchange> getProducerExchangeFromConn(TransportConnection transportConnection) throws NoSuchFieldException, IllegalAccessException {
- Field f = TransportConnection.class.getDeclaredField("producerExchanges");
- f.setAccessible(true);
- Map<ProducerId, ProducerBrokerExchange> producerExchanges = (Map<ProducerId, ProducerBrokerExchange>) f.get(transportConnection);
- return producerExchanges;
- }
-
- private Message createRequest(Session session, Destination replyTo) throws JMSException {
- Message message = session.createTextMessage("Payload");
- message.setJMSReplyTo(replyTo);
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
deleted file mode 100644
index 415dad6..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.ConsumerThread;
-import org.apache.activemq.util.ProducerThread;
-import org.apache.activemq.util.Wait;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class AMQ4323Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4323Test.class);
-
- BrokerService broker = null;
- File kahaDbDir = null;
- private final Destination destination = new ActiveMQQueue("q");
- final String payload = new String(new byte[1024]);
-
- protected void startBroker(boolean delete) throws Exception {
- broker = new BrokerService();
-
- //Start with a clean directory
- kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB");
- deleteDir(kahaDbDir);
-
- broker.setSchedulerSupport(false);
- broker.setDeleteAllMessagesOnStartup(delete);
- broker.setPersistent(true);
- broker.setUseJmx(false);
- broker.addConnector("tcp://localhost:0");
-
- PolicyMap map = new PolicyMap();
- PolicyEntry entry = new PolicyEntry();
- entry.setUseCache(false);
- map.setDefaultEntry(entry);
- broker.setDestinationPolicy(map);
-
- configurePersistence(broker, delete);
-
- broker.start();
- LOG.info("Starting broker..");
- }
-
- protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
- KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
-
- // ensure there are a bunch of data files but multiple entries in each
- adapter.setJournalMaxFileLength(1024 * 20);
-
- // speed up the test case, checkpoint and cleanup early and often
- adapter.setCheckpointInterval(500);
- adapter.setCleanupInterval(500);
-
- if (!deleteAllOnStart) {
- adapter.setForceRecoverIndex(true);
- }
-
- }
-
- private boolean deleteDir(File dir) {
- if (dir.isDirectory()) {
- String[] children = dir.list();
- for (int i = 0; i < children.length; i++) {
- boolean success = deleteDir(new File(dir, children[i]));
- if (!success) {
- return false;
- }
- }
- }
-
- return dir.delete();
- }
-
- private int getFileCount(File dir) {
- if (dir.isDirectory()) {
- String[] children = dir.list();
- return children.length;
- }
-
- return 0;
- }
-
- @Test
- public void testCleanupOfFiles() throws Exception {
- final int messageCount = 500;
- startBroker(true);
- int fileCount = getFileCount(kahaDbDir);
- assertEquals(4, fileCount);
-
- Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
- connection.start();
- Session producerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session consumerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ProducerThread producer = new ProducerThread(producerSess, destination) {
- @Override
- protected Message createMessage(int i) throws Exception {
- return sess.createTextMessage(payload + "::" + i);
- }
- };
- producer.setMessageCount(messageCount);
- ConsumerThread consumer = new ConsumerThread(consumerSess, destination);
- consumer.setBreakOnNull(false);
- consumer.setMessageCount(messageCount);
-
- producer.start();
- producer.join();
-
- consumer.start();
- consumer.join();
-
- assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
-
- // verify cleanup
- assertTrue("gc worked", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- int fileCount = getFileCount(kahaDbDir);
- LOG.info("current filecount:" + fileCount);
- return 4 == fileCount;
- }
- }));
-
- broker.stop();
- broker.waitUntilStopped();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
deleted file mode 100644
index 3d4ec84..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4356Test {
-
- private static BrokerService brokerService;
- private static String BROKER_ADDRESS = "tcp://localhost:0";
-
- private String connectionUri;
- private ActiveMQConnectionFactory cf;
- private final String CLIENT_ID = "AMQ4356Test";
- private final String SUBSCRIPTION_NAME = "AMQ4356Test";
-
- private void createBroker(boolean deleteOnStart) throws Exception {
- brokerService = new BrokerService();
- brokerService.setUseJmx(true);
- brokerService.setDeleteAllMessagesOnStartup(deleteOnStart);
- connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
- brokerService.start();
- brokerService.waitUntilStarted();
-
- }
-
- private void startBroker() throws Exception {
- createBroker(true);
- }
-
- private void restartBroker() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- createBroker(false);
- }
-
- @Before
- public void setUp() throws Exception {
- startBroker();
- cf = new ActiveMQConnectionFactory(connectionUri);
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- @Test
- public void testVirtualTopicUnsubDurable() throws Exception {
- Connection connection = cf.createConnection();
- connection.setClientID(CLIENT_ID);
- connection.start();
-
- // create consumer 'cluster'
- ActiveMQQueue queue1 = new ActiveMQQueue(getVirtualTopicConsumerName());
- ActiveMQQueue queue2 = new ActiveMQQueue(getVirtualTopicConsumerName());
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c1 = session.createConsumer(queue1);
- c1.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- }
- });
- MessageConsumer c2 = session.createConsumer(queue2);
- c2.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- }
- });
-
- ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName());
- MessageConsumer c3 = session.createDurableSubscriber(topic, SUBSCRIPTION_NAME);
-
- assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
- assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
-
- c3.close();
-
- // create topic producer
- MessageProducer producer = session.createProducer(topic);
- assertNotNull(producer);
-
- int total = 10;
- for (int i = 0; i < total; i++) {
- producer.send(session.createTextMessage("message: " + i));
- }
-
- assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
- assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
-
- session.unsubscribe(SUBSCRIPTION_NAME);
- connection.close();
-
- assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
- assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
-
- restartBroker();
-
- assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
- assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
- }
-
- protected String getVirtualTopicName() {
- return "VirtualTopic.TEST";
- }
-
- protected String getVirtualTopicConsumerName() {
- return "Consumer.A.VirtualTopic.TEST";
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java
deleted file mode 100644
index 27c4f64..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4361Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4361Test.class);
-
- private BrokerService service;
- private String brokerUrlString;
-
- @Before
- public void setUp() throws Exception {
- service = new BrokerService();
- service.setDeleteAllMessagesOnStartup(true);
- service.setUseJmx(false);
-
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry policy = new PolicyEntry();
- policy.setMemoryLimit(1);
- policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
- policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
- policy.setProducerFlowControl(true);
- policyMap.setDefaultEntry(policy);
- service.setDestinationPolicy(policyMap);
-
- service.setAdvisorySupport(false);
- brokerUrlString = service.addConnector("tcp://localhost:0").getPublishableConnectString();
- service.start();
- service.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- if (service != null) {
- service.stop();
- service.waitUntilStopped();
- }
- }
-
- @Test
- public void testCloseWhenHunk() throws Exception {
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrlString);
- connectionFactory.setProducerWindowSize(1024);
-
- // TINY QUEUE is flow controlled after 1024 bytes
- final ActiveMQDestination destination = ActiveMQDestination.createDestination("queue://TINY_QUEUE", (byte) 0xff);
-
- Connection connection = connectionFactory.createConnection();
- connection.start();
- final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageProducer producer = session.createProducer(destination);
- producer.setTimeToLive(0);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- final AtomicReference<Exception> publishException = new AtomicReference<>(null);
- final AtomicReference<Exception> closeException = new AtomicReference<>(null);
- final AtomicLong lastLoop = new AtomicLong(System.currentTimeMillis() + 100);
-
- Thread pubThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- byte[] data = new byte[1000];
- new Random(0xdeadbeef).nextBytes(data);
- for (int i = 0; i < 10000; i++) {
- lastLoop.set(System.currentTimeMillis());
- ObjectMessage objMsg = session.createObjectMessage();
- objMsg.setObject(data);
- producer.send(destination, objMsg);
- }
- }
- catch (Exception e) {
- publishException.set(e);
- }
- }
- }, "PublishingThread");
- pubThread.start();
-
- // wait for publisher to deadlock
- while (System.currentTimeMillis() - lastLoop.get() < 2000) {
- Thread.sleep(100);
- }
- LOG.info("Publisher deadlock detected.");
-
- Thread closeThread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- LOG.info("Attempting close..");
- producer.close();
- }
- catch (Exception e) {
- closeException.set(e);
- }
- }
- }, "ClosingThread");
- closeThread.start();
-
- try {
- closeThread.join(30000);
- }
- catch (InterruptedException ie) {
- assertFalse("Closing thread didn't complete in 10 seconds", true);
- }
-
- try {
- pubThread.join(30000);
- }
- catch (InterruptedException ie) {
- assertFalse("Publishing thread didn't complete in 10 seconds", true);
- }
-
- assertNull(closeException.get());
- assertNotNull(publishException.get());
- }
-}
-
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
deleted file mode 100644
index ef53a0a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4368Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4368Test.class);
-
- private BrokerService broker;
- private ActiveMQConnectionFactory connectionFactory;
- private final Destination destination = new ActiveMQQueue("large_message_queue");
- private String connectionUri;
-
- @Before
- public void setUp() throws Exception {
- broker = createBroker();
- connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
- broker.start();
- connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
-
- PolicyEntry policy = new PolicyEntry();
- policy.setUseCache(false);
- broker.setDestinationPolicy(new PolicyMap());
- broker.getDestinationPolicy().setDefaultEntry(policy);
-
- KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
- kahadb.setCheckForCorruptJournalFiles(true);
- kahadb.setCleanupInterval(1000);
-
- kahadb.deleteAllMessages();
- broker.setPersistenceAdapter(kahadb);
- broker.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 100);
- broker.setUseJmx(false);
-
- return broker;
- }
-
- abstract class Client implements Runnable {
-
- private final String name;
- final AtomicBoolean done = new AtomicBoolean();
- CountDownLatch startedLatch;
- CountDownLatch doneLatch = new CountDownLatch(1);
- Connection connection;
- Session session;
- final AtomicLong size = new AtomicLong();
-
- Client(String name, CountDownLatch startedLatch) {
- this.name = name;
- this.startedLatch = startedLatch;
- }
-
- public void start() {
- LOG.info("Starting: " + name);
- new Thread(this, name).start();
- }
-
- public void stopAsync() {
- done.set(true);
- }
-
- public void stop() throws InterruptedException {
- stopAsync();
- if (!doneLatch.await(20, TimeUnit.MILLISECONDS)) {
- try {
- connection.close();
- doneLatch.await();
- }
- catch (Exception e) {
- }
- }
- }
-
- @Override
- public void run() {
- try {
- connection = createConnection();
- connection.start();
- try {
- session = createSession();
- work();
- }
- finally {
- try {
- connection.close();
- }
- catch (JMSException ignore) {
- }
- LOG.info("Stopped: " + name);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- done.set(true);
- }
- finally {
- doneLatch.countDown();
- }
- }
-
- protected Session createSession() throws JMSException {
- return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- protected Connection createConnection() throws JMSException {
- return connectionFactory.createConnection();
- }
-
- abstract protected void work() throws Exception;
- }
-
- class ProducingClient extends Client {
-
- ProducingClient(String name, CountDownLatch startedLatch) {
- super(name, startedLatch);
- }
-
- private String createMessage() {
- StringBuffer stringBuffer = new StringBuffer();
- for (long i = 0; i < 1000000; i++) {
- stringBuffer.append("1234567890");
- }
- return stringBuffer.toString();
- }
-
- @Override
- protected void work() throws Exception {
- String data = createMessage();
- MessageProducer producer = session.createProducer(destination);
- startedLatch.countDown();
- while (!done.get()) {
- producer.send(session.createTextMessage(data));
- long i = size.incrementAndGet();
- if ((i % 1000) == 0) {
- LOG.info("produced " + i + ".");
- }
- }
- }
- }
-
- class ConsumingClient extends Client {
-
- public ConsumingClient(String name, CountDownLatch startedLatch) {
- super(name, startedLatch);
- }
-
- @Override
- protected void work() throws Exception {
- MessageConsumer consumer = session.createConsumer(destination);
- startedLatch.countDown();
- while (!done.get()) {
- Message msg = consumer.receive(100);
- if (msg != null) {
- size.incrementAndGet();
- }
- }
- }
- }
-
- @Test
- public void testENTMQ220() throws Exception {
- LOG.info("Start test.");
- CountDownLatch producer1Started = new CountDownLatch(1);
- CountDownLatch producer2Started = new CountDownLatch(1);
- CountDownLatch listener1Started = new CountDownLatch(1);
-
- final ProducingClient producer1 = new ProducingClient("1", producer1Started);
- final ProducingClient producer2 = new ProducingClient("2", producer2Started);
- final ConsumingClient listener1 = new ConsumingClient("subscriber-1", listener1Started);
- final AtomicLong lastSize = new AtomicLong();
-
- try {
-
- producer1.start();
- producer2.start();
- listener1.start();
-
- producer1Started.await(15, TimeUnit.SECONDS);
- producer2Started.await(15, TimeUnit.SECONDS);
- listener1Started.await(15, TimeUnit.SECONDS);
-
- lastSize.set(listener1.size.get());
- for (int i = 0; i < 10; i++) {
- Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return listener1.size.get() > lastSize.get();
- }
- });
- long size = listener1.size.get();
- LOG.info("Listener 1: consumed: " + (size - lastSize.get()));
- assertTrue("No messages received on iteration: " + i, size > lastSize.get());
- lastSize.set(size);
- }
- }
- finally {
- LOG.info("Stopping clients");
- producer1.stop();
- producer2.stop();
- listener1.stop();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java
deleted file mode 100644
index 38a9398..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4407Test {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ4407Test.class);
- private final static int maxFileLength = 1024 * 1024 * 32;
-
- private final static String PREFIX_DESTINATION_NAME = "queue";
-
- private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + ".test";
- private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + "2.test";
- private final static String DESTINATION_NAME_3 = PREFIX_DESTINATION_NAME + "3.test";
-
- BrokerService broker;
-
- @Before
- public void setUp() throws Exception {
- prepareBrokerWithMultiStore(true);
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-
- protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
- BrokerService broker = new BrokerService();
- broker.setUseJmx(true);
- broker.setBrokerName("localhost");
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- @Test
- public void testRestartAfterQueueDelete() throws Exception {
-
- // Ensure we have an Admin View.
- assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return (broker.getAdminView()) != null;
- }
- }));
-
- LOG.info("Adding destinations: {}, {}, {}", new Object[]{DESTINATION_NAME, DESTINATION_NAME_3, DESTINATION_NAME_3});
- sendMessage(DESTINATION_NAME, "test 1");
- sendMessage(DESTINATION_NAME_2, "test 1");
- sendMessage(DESTINATION_NAME_3, "test 1");
-
- assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
- assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
- assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_3)));
-
- LOG.info("Removing destination: {}", DESTINATION_NAME_2);
- broker.getAdminView().removeQueue(DESTINATION_NAME_2);
-
- LOG.info("Recreating destination: {}", DESTINATION_NAME_2);
- sendMessage(DESTINATION_NAME_2, "test 1");
-
- Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
- assertNotNull(destination2);
- assertEquals(1, destination2.getMessageStore().getMessageCount());
- }
-
- protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
- KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
- kaha.setJournalMaxFileLength(maxFileLength);
- kaha.setCleanupInterval(5000);
- if (delete) {
- kaha.deleteAllMessages();
- }
- return kaha;
- }
-
- public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
-
- MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
- if (deleteAllMessages) {
- multiKahaDBPersistenceAdapter.deleteAllMessages();
- }
- ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-
- adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages));
- adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + "2", deleteAllMessages));
- adapters.add(createFilteredKahaDBByDestinationPrefix(null, deleteAllMessages));
-
- multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
- broker = createBroker(multiKahaDBPersistenceAdapter);
- }
-
- /**
- * Create filtered KahaDB adapter by destination prefix.
- *
- * @param destinationPrefix
- * @param deleteAllMessages
- * @return
- * @throws IOException
- */
- private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix,
- boolean deleteAllMessages) throws IOException {
- FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
- template.setPersistenceAdapter(createStore(deleteAllMessages));
- if (destinationPrefix != null) {
- template.setQueue(destinationPrefix + ".>");
- }
- return template;
- }
-
- /**
- * Send message to particular destination.
- *
- * @param destinationName
- * @param message
- * @throws JMSException
- */
- private void sendMessage(String destinationName, String message) throws JMSException {
- ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
- f.setAlwaysSyncSend(true);
- Connection c = f.createConnection();
- c.start();
- Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = s.createProducer(new ActiveMQQueue(destinationName));
- producer.send(s.createTextMessage(message));
- producer.close();
- s.close();
- c.stop();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java
deleted file mode 100644
index cd3ed95..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class AMQ4413Test {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ4413Test.class);
-
- final String brokerUrl = "tcp://localhost:0";
- private String connectionUri;
- final int numMsgsTriggeringReconnection = 2;
- final int numMsgs = 30;
- final int numTests = 75;
- final ExecutorService threadPool = Executors.newCachedThreadPool();
-
- @Test
- public void testDurableSubMessageLoss() throws Exception {
- // start embedded broker
- BrokerService brokerService = new BrokerService();
- connectionUri = brokerService.addConnector(brokerUrl).getPublishableConnectString();
- brokerService.setPersistent(false);
- brokerService.setUseJmx(false);
- brokerService.setKeepDurableSubsActive(true);
- brokerService.setAdvisorySupport(false);
- brokerService.start();
- LOG.info("##### broker started");
-
- // repeat test 50 times
- try {
- for (int i = 0; i < numTests; ++i) {
- LOG.info("##### test " + i + " started");
- test();
- }
-
- LOG.info("##### tests are done");
- }
- catch (Exception e) {
- e.printStackTrace();
- LOG.info("##### tests failed!");
- }
- finally {
- threadPool.shutdown();
- brokerService.stop();
- LOG.info("##### broker stopped");
- }
- }
-
- private void test() throws Exception {
-
- final String topicName = "topic-" + UUID.randomUUID();
- final String clientId = "client-" + UUID.randomUUID();
- final String subName = "sub-" + UUID.randomUUID();
-
- // create (and only create) subscription first
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- factory.setWatchTopicAdvisories(false);
- Connection connection = factory.createConnection();
- connection.setClientID(clientId);
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(topicName);
- TopicSubscriber durableSubscriptionCreator = session.createDurableSubscriber(topic, subName);
-
- connection.stop();
- durableSubscriptionCreator.close();
- session.close();
- connection.close();
-
- // publisher task
- Callable<Boolean> publisher = new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- Connection connection = null;
-
- try {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- factory.setWatchTopicAdvisories(false);
- connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(topicName);
-
- MessageProducer producer = session.createProducer(topic);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- producer.setPriority(Message.DEFAULT_PRIORITY);
- producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
-
- for (int seq = 1; seq <= numMsgs; ++seq) {
- TextMessage msg = session.createTextMessage(String.valueOf(seq));
- producer.send(msg);
- LOG.info("pub sent msg: " + seq);
- Thread.sleep(1L);
- }
-
- LOG.info("pub is done");
- }
- finally {
- if (connection != null) {
- try {
- connection.close();
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }
- return Boolean.TRUE;
- }
- };
-
- // subscriber task
- Callable<Boolean> durableSubscriber = new Callable<Boolean>() {
- ActiveMQConnectionFactory factory;
- Connection connection;
- Session session;
- Topic topic;
- TopicSubscriber consumer;
-
- @Override
- public Boolean call() throws Exception {
- factory = new ActiveMQConnectionFactory(connectionUri);
- factory.setWatchTopicAdvisories(false);
-
- try {
- connect();
-
- for (int seqExpected = 1; seqExpected <= numMsgs; ++seqExpected) {
- TextMessage msg = (TextMessage) consumer.receive(3000L);
- if (msg == null) {
- LOG.info("expected: " + seqExpected + ", actual: timed out", msg);
- return Boolean.FALSE;
- }
-
- int seq = Integer.parseInt(msg.getText());
-
- LOG.info("sub received msg: " + seq);
-
- if (seqExpected != seq) {
- LOG.info("expected: " + seqExpected + ", actual: " + seq);
- return Boolean.FALSE;
- }
-
- if (seq % numMsgsTriggeringReconnection == 0) {
- close(false);
- connect();
-
- LOG.info("sub reconnected");
- }
- }
-
- LOG.info("sub is done");
- }
- finally {
- try {
- close(true);
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- return Boolean.TRUE;
- }
-
- void connect() throws Exception {
- connection = factory.createConnection();
- connection.setClientID(clientId);
- connection.start();
-
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- topic = session.createTopic(topicName);
- consumer = session.createDurableSubscriber(topic, subName);
- }
-
- void close(boolean unsubscribe) throws Exception {
- if (connection != null) {
- connection.stop();
- }
-
- if (consumer != null) {
- consumer.close();
- }
-
- if (session != null) {
- if (unsubscribe) {
- session.unsubscribe(subName);
- }
- session.close();
- }
-
- if (connection != null) {
- connection.close();
- }
- }
- };
-
- ArrayList<Future<Boolean>> results = new ArrayList<>();
- results.add(threadPool.submit(publisher));
- results.add(threadPool.submit(durableSubscriber));
-
- for (Future<Boolean> result : results) {
- assertTrue(result.get());
- }
- }
-}