You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/16 02:53:47 UTC
[23/60] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java
deleted file mode 100644
index 9d79a8e..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-
-public class AMQ2580Test extends TestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2580Test.class);
-
- private static final String TOPIC_NAME = "topicName";
- private static final String CLIENT_ID = "client_id";
- private static final String textOfSelectedMsg = "good_message";
-
- protected TopicConnection connection;
-
- private Topic topic;
- private Session session;
- private MessageProducer producer;
- private ConnectionFactory connectionFactory;
- private BrokerService service;
-
- public static Test suite() {
- return suite(AMQ2580Test.class);
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- initDurableBroker();
- initConnectionFactory();
- initTopic();
- }
-
- @Override
- protected void tearDown() throws Exception {
- shutdownClient();
- service.stop();
- super.tearDown();
- }
-
- private void initConnection() throws JMSException {
- if (connection == null) {
- LOG.info("Initializing connection");
-
- connection = (TopicConnection) connectionFactory.createConnection();
- connection.start();
- }
- }
-
- public void initCombosForTestTopicIsDurableSmokeTest() throws Exception {
- addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values());
- }
-
- public void testTopicIsDurableSmokeTest() throws Exception {
-
- initClient();
- MessageConsumer consumer = createMessageConsumer();
- LOG.info("Consuming message");
- assertNull(consumer.receive(1));
- shutdownClient();
- consumer.close();
-
- sendMessages();
- shutdownClient();
-
- initClient();
- consumer = createMessageConsumer();
-
- LOG.info("Consuming message");
- TextMessage answer1 = (TextMessage) consumer.receive(1000);
- assertNotNull("we got our message", answer1);
-
- consumer.close();
- }
-
- private MessageConsumer createMessageConsumer() throws JMSException {
- LOG.info("creating durable subscriber");
- return session.createDurableSubscriber(topic, TOPIC_NAME, "name='value'", false);
- }
-
- private void initClient() throws JMSException {
- LOG.info("Initializing client");
-
- initConnection();
- initSession();
- }
-
- private void shutdownClient() throws JMSException {
- LOG.info("Closing session and connection");
- session.close();
- connection.close();
- session = null;
- connection = null;
- }
-
- private void sendMessages() throws JMSException {
- initConnection();
-
- initSession();
-
- LOG.info("Creating producer");
- producer = session.createProducer(topic);
-
- sendMessageThatFailsSelection();
-
- sendMessage(textOfSelectedMsg, "value");
- }
-
- private void initSession() throws JMSException {
- LOG.info("Initializing session");
- session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- private void sendMessageThatFailsSelection() throws JMSException {
- for (int i = 0; i < 5; i++) {
- String textOfNotSelectedMsg = "Msg_" + i;
- sendMessage(textOfNotSelectedMsg, "not_value");
- LOG.info("#");
- }
- }
-
- private void sendMessage(String msgText, String propertyValue) throws JMSException {
- LOG.info("Creating message: " + msgText);
- TextMessage messageToSelect = session.createTextMessage(msgText);
- messageToSelect.setStringProperty("name", propertyValue);
- LOG.info("Sending message");
- producer.send(messageToSelect);
- }
-
- protected void initConnectionFactory() throws Exception {
- ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory();
- connectionFactory = activeMqConnectionFactory;
- }
-
- private ActiveMQConnectionFactory createActiveMqConnectionFactory() throws Exception {
- ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory("failover:" + service.getTransportConnectors().get(0).getConnectUri().toString());
- activeMqConnectionFactory.setWatchTopicAdvisories(false);
- ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
- prefetchPolicy.setDurableTopicPrefetch(2);
- prefetchPolicy.setOptimizeDurableTopicPrefetch(2);
- activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy);
- activeMqConnectionFactory.setClientID(CLIENT_ID);
- return activeMqConnectionFactory;
- }
-
- private void initDurableBroker() throws Exception {
- service = new BrokerService();
- setDefaultPersistenceAdapter(service);
- service.setDeleteAllMessagesOnStartup(true);
- service.setAdvisorySupport(false);
- service.setTransportConnectorURIs(new String[]{"tcp://localhost:0"});
- service.setPersistent(true);
- service.setUseJmx(false);
- service.start();
-
- }
-
- private void initTopic() throws JMSException {
- initConnection();
- TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- topic = topicSession.createTopic(TOPIC_NAME);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
deleted file mode 100644
index 3b7a11b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-// variation on AMQ2584 where the DLQ consumer works in parallel to producer so
-// that some dups are not suppressed as they are already acked by the consumer
-// the audit needs to be disabled to allow these dupes to be consumed
-public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ2584ConcurrentDlqTest.class);
- BrokerService broker = null;
- ActiveMQTopic topic;
-
- ActiveMQConnection consumerConnection = null, producerConnection = null, dlqConnection = null;
- Session consumerSession;
- Session producerSession;
- MessageProducer producer;
- Vector<TopicSubscriber> duralbeSubs = new Vector<>();
- final int numMessages = 1000;
- final int numDurableSubs = 2;
-
- String data;
- private long dlqConsumerLastReceivedTimeStamp;
- private AtomicLong dlqReceivedCount = new AtomicLong(0);
-
- // 2 deliveries of each message, 3 producers
- CountDownLatch redeliveryConsumerLatch = new CountDownLatch(((2 * numMessages) * numDurableSubs) - 1);
- // should get at least numMessages, possibly more
- CountDownLatch dlqConsumerLatch = new CountDownLatch((numMessages - 1));
-
- public void testSize() throws Exception {
- openConsumer(redeliveryConsumerLatch);
- openDlqConsumer(dlqConsumerLatch);
-
- assertEquals(0, broker.getAdminView().getStorePercentUsage());
-
- for (int i = 0; i < numMessages; i++) {
- sendMessage(false);
- }
-
- final BrokerView brokerView = broker.getAdminView();
-
- broker.getSystemUsage().getStoreUsage().isFull();
- LOG.info("store percent usage: " + brokerView.getStorePercentUsage());
- assertTrue("redelivery consumer got all it needs, remaining: " + redeliveryConsumerLatch.getCount(), redeliveryConsumerLatch.await(60, TimeUnit.SECONDS));
- assertTrue("dql consumer got all it needs", dlqConsumerLatch.await(60, TimeUnit.SECONDS));
- closeConsumer();
-
- LOG.info("Giving dlq a chance to clear down once topic consumer is closed");
-
- // consumer all of the duplicates that arrived after the first ack
- closeDlqConsumer();
-
- //get broker a chance to clean obsolete messages, wait 2*cleanupInterval
- Thread.sleep(5000);
-
- FilenameFilter justLogFiles = new FilenameFilter() {
- @Override
- public boolean accept(File file, String s) {
- return s.endsWith(".log");
- }
- };
- int numFiles = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles).length;
- if (numFiles > 2) {
- LOG.info(Arrays.toString(((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles)));
- }
- LOG.info("num files: " + numFiles);
- assertEquals("kahaDB dir should contain 1 db file,is: " + numFiles, 1, numFiles);
- }
-
- private void openConsumer(final CountDownLatch latch) throws Exception {
- consumerConnection = (ActiveMQConnection) createConnection();
- consumerConnection.setClientID("cliID");
- consumerConnection.start();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageListener listener = new MessageListener() {
- @Override
- public void onMessage(Message message) {
- latch.countDown();
- try {
- consumerSession.recover();
- }
- catch (Exception ignored) {
- ignored.printStackTrace();
- }
- }
- };
-
- for (int i = 1; i <= numDurableSubs; i++) {
- TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, "subName" + i);
- sub.setMessageListener(listener);
- duralbeSubs.add(sub);
- }
- }
-
- private void openDlqConsumer(final CountDownLatch received) throws Exception {
-
- dlqConnection = (ActiveMQConnection) createConnection();
- Session dlqSession = dlqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
- dlqConsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- if (received.getCount() > 0 && received.getCount() % 200 == 0) {
- LOG.info("remaining on DLQ: " + received.getCount());
- }
- received.countDown();
- dlqConsumerLastReceivedTimeStamp = System.currentTimeMillis();
- dlqReceivedCount.incrementAndGet();
- }
- });
- dlqConnection.start();
- }
-
- private void closeConsumer() throws JMSException {
- for (TopicSubscriber sub : duralbeSubs) {
- sub.close();
- }
- if (consumerSession != null) {
- for (int i = 1; i <= numDurableSubs; i++) {
- consumerSession.unsubscribe("subName" + i);
- }
- }
- if (consumerConnection != null) {
- consumerConnection.close();
- consumerConnection = null;
- }
- }
-
- private void closeDlqConsumer() throws JMSException, InterruptedException {
- final long limit = System.currentTimeMillis() + 30 * 1000;
- if (dlqConsumerLastReceivedTimeStamp > 0) {
- while (System.currentTimeMillis() < dlqConsumerLastReceivedTimeStamp + 5000 && System.currentTimeMillis() < limit) {
- LOG.info("waiting for DLQ do drain, receivedCount: " + dlqReceivedCount);
- TimeUnit.SECONDS.sleep(1);
- }
- }
- if (dlqConnection != null) {
- dlqConnection.close();
- dlqConnection = null;
- }
- }
-
- private void sendMessage(boolean filter) throws Exception {
- if (producerConnection == null) {
- producerConnection = (ActiveMQConnection) createConnection();
- producerConnection.start();
- producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = producerSession.createProducer(topic);
- }
-
- Message message = producerSession.createMessage();
- message.setStringProperty("data", data);
- producer.send(message);
- }
-
- private void startBroker(boolean deleteMessages) throws Exception {
- broker = new BrokerService();
- broker.setAdvisorySupport(false);
- broker.setBrokerName("testStoreSize");
-
- PolicyMap map = new PolicyMap();
- PolicyEntry entry = new PolicyEntry();
- entry.setEnableAudit(false);
- map.setDefaultEntry(entry);
- broker.setDestinationPolicy(map);
-
- if (deleteMessages) {
- broker.setDeleteAllMessagesOnStartup(true);
- }
- configurePersistenceAdapter(broker.getPersistenceAdapter());
- broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000);
- broker.start();
- }
-
- private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) {
- Properties properties = new Properties();
- String maxFileLengthVal = String.valueOf(2 * 1024 * 1024);
- properties.put("journalMaxFileLength", maxFileLengthVal);
- properties.put("maxFileLength", maxFileLengthVal);
- properties.put("cleanupInterval", "2000");
- properties.put("checkpointInterval", "2000");
- // there are problems with duplicate dispatch in the cursor, which maintain
- // a map of messages. A dup dispatch can be dropped.
- // see: org.apache.activemq.broker.region.cursors.OrderedPendingList
- // Adding duplicate detection to the default DLQ strategy removes the problem
- // which means we can leave the default for concurrent store and dispatch q
- //properties.put("concurrentStoreAndDispatchQueues", "false");
-
- IntrospectionSupport.setProperties(persistenceAdapter, properties);
- }
-
- private void stopBroker() throws Exception {
- if (broker != null)
- broker.stop();
- broker = null;
- }
-
- @Override
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0&waitForStart=5000&create=false");
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
-
- StringBuilder sb = new StringBuilder(5000);
- for (int i = 0; i < 5000; i++) {
- sb.append('a');
- }
- data = sb.toString();
-
- startBroker(true);
- topic = (ActiveMQTopic) createDestination();
- }
-
- @Override
- protected void tearDown() throws Exception {
- stopBroker();
- super.tearDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java
deleted file mode 100644
index 14760d9..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(value = Parameterized.class)
-public class AMQ2584Test extends org.apache.activemq.TestSupport {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ2584Test.class);
- BrokerService broker = null;
- ActiveMQTopic topic;
-
- ActiveMQConnection consumerConnection = null, producerConnection = null;
- Session producerSession;
- MessageProducer producer;
- final int minPercentUsageForStore = 3;
- String data;
-
- private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
-
- @Parameterized.Parameters(name = "{0}")
- public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters() {
- TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB};
- TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB};
- List<TestSupport.PersistenceAdapterChoice[]> choices = new ArrayList<>();
- choices.add(kahaDb);
- choices.add(levelDb);
-
- return choices;
- }
-
- public AMQ2584Test(TestSupport.PersistenceAdapterChoice choice) {
- this.persistenceAdapterChoice = choice;
- }
-
- @Test(timeout = 120000)
- public void testSize() throws Exception {
- int messages = 1000;
- CountDownLatch redeliveryConsumerLatch = new CountDownLatch((messages * 3));
- openConsumer(redeliveryConsumerLatch);
-
- assertEquals(0, broker.getAdminView().getStorePercentUsage());
-
- for (int i = 0; i < messages; i++) {
- sendMessage(false);
- }
-
- final BrokerView brokerView = broker.getAdminView();
-
- broker.getSystemUsage().getStoreUsage().isFull();
- LOG.info("store percent usage: " + brokerView.getStorePercentUsage());
- int storePercentUsage = broker.getAdminView().getStorePercentUsage();
- assertTrue("some store in use", storePercentUsage > minPercentUsageForStore);
-
- assertTrue("redelivery consumer got all it needs", redeliveryConsumerLatch.await(60, TimeUnit.SECONDS));
- closeConsumer();
-
- // consume from DLQ
- final CountDownLatch received = new CountDownLatch(messages);
- consumerConnection = (ActiveMQConnection) createConnection();
- Session dlqSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
- dlqConsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- if (received.getCount() % 500 == 0) {
- LOG.info("remaining on DLQ: " + received.getCount());
- }
- received.countDown();
- }
- });
- consumerConnection.start();
-
- assertTrue("Not all messages reached the DLQ", received.await(60, TimeUnit.SECONDS));
-
- assertTrue("Store usage exceeds expected usage", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- broker.getSystemUsage().getStoreUsage().isFull();
- LOG.info("store precent usage: " + brokerView.getStorePercentUsage());
- return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore;
- }
- }));
-
- closeConsumer();
-
- }
-
- private void openConsumer(final CountDownLatch latch) throws Exception {
- consumerConnection = (ActiveMQConnection) createConnection();
- consumerConnection.setClientID("cliID");
- consumerConnection.start();
- final Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageListener listener = new MessageListener() {
- @Override
- public void onMessage(Message message) {
- latch.countDown();
- try {
- session.recover();
- }
- catch (Exception ignored) {
- ignored.printStackTrace();
- }
-
- }
- };
-
- session.createDurableSubscriber(topic, "subName1").setMessageListener(listener);
- session.createDurableSubscriber(topic, "subName2").setMessageListener(listener);
- session.createDurableSubscriber(topic, "subName3").setMessageListener(listener);
- }
-
- private void closeConsumer() throws JMSException {
- if (consumerConnection != null)
- consumerConnection.close();
- consumerConnection = null;
- }
-
- private void sendMessage(boolean filter) throws Exception {
- if (producerConnection == null) {
- producerConnection = (ActiveMQConnection) createConnection();
- producerConnection.start();
- producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = producerSession.createProducer(topic);
- }
-
- Message message = producerSession.createMessage();
- message.setStringProperty("data", data);
- producer.send(message);
- }
-
- private void startBroker(boolean deleteMessages) throws Exception {
- broker = new BrokerService();
- broker.setAdvisorySupport(false);
- broker.setBrokerName("testStoreSize");
-
- if (deleteMessages) {
- broker.setDeleteAllMessagesOnStartup(true);
- }
- LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString());
- setPersistenceAdapter(broker, persistenceAdapterChoice);
- configurePersistenceAdapter(broker.getPersistenceAdapter());
- broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000);
- broker.start();
- }
-
- private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) {
- Properties properties = new Properties();
- String maxFileLengthVal = String.valueOf(1 * 1024 * 1024);
- properties.put("journalMaxFileLength", maxFileLengthVal);
- properties.put("maxFileLength", maxFileLengthVal);
- properties.put("cleanupInterval", "2000");
- properties.put("checkpointInterval", "2000");
-
- IntrospectionSupport.setProperties(persistenceAdapter, properties);
- }
-
- private void stopBroker() throws Exception {
- if (broker != null)
- broker.stop();
- broker = null;
- }
-
- @Override
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&jms.redeliveryPolicy.maximumRedeliveries=0&jms.closeTimeout=60000&waitForStart=5000&create=false");
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- StringBuilder sb = new StringBuilder(5000);
- for (int i = 0; i < 5000; i++) {
- sb.append('a');
- }
- data = sb.toString();
-
- startBroker(true);
- topic = (ActiveMQTopic) createDestination();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- stopBroker();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java
deleted file mode 100644
index 71cb2a8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.EmbeddedBrokerAndConnectionTestSupport;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.spring.ConsumerBean;
-
-public class AMQ2585Test extends EmbeddedBrokerAndConnectionTestSupport {
-
- private final Destination destination = new ActiveMQQueue("MyQueue");
- final static String LENGTH10STRING = "1234567890";
- private Session session;
- private MessageProducer producer;
- private ConsumerBean messageList;
-
- public void testOneMessageWithProperties() throws Exception {
- TextMessage message = session.createTextMessage(LENGTH10STRING);
- message.setStringProperty(LENGTH10STRING, LENGTH10STRING);
- producer.send(message);
-
- messageList.assertMessagesArrived(1);
-
- ActiveMQTextMessage received = ((ActiveMQTextMessage) messageList.flushMessages().get(0));
-
- assertEquals(LENGTH10STRING, received.getText());
- assertTrue(received.getProperties().size() > 0);
- assertTrue(received.propertyExists(LENGTH10STRING));
- assertEquals(LENGTH10STRING, received.getStringProperty(LENGTH10STRING));
-
- /**
- * As specified by getSize(), the size (memory usage) of the body should
- * be length of text * 2. Unsure of how memory usage is calculated for
- * properties, but should probably not be less than the sum of (string)
- * lengths for the key name and value.
- */
-
- final int sizeShouldBeNoLessThan = LENGTH10STRING.length() * 4 + Message.DEFAULT_MINIMUM_MESSAGE_SIZE;
- assertTrue("Message size was smaller than expected: " + received.getSize(), received.getSize() >= sizeShouldBeNoLessThan);
- assertFalse(LENGTH10STRING.length() * 2 == received.getSize());
- }
-
- @Override
- protected void setUp() throws Exception {
- bindAddress = bindAddress + "?marshal=true";
- super.setUp();
- messageList = new ConsumerBean();
- messageList.setVerbose(true);
-
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer messageConsumer = session.createConsumer(destination);
-
- messageConsumer.setMessageListener(messageList);
-
- producer = session.createProducer(destination);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
deleted file mode 100644
index f22ff48..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.IOHelper;
-
-public class AMQ2616Test extends TestCase {
-
- private static final int NUMBER = 2000;
- private BrokerService brokerService;
- private final ArrayList<Thread> threads = new ArrayList<>();
- private final String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:0";
- private final AtomicBoolean shutdown = new AtomicBoolean();
-
- private String connectionUri;
-
- public void testQueueResourcesReleased() throws Exception {
- ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(connectionUri);
- Connection tempConnection = fac.createConnection();
- tempConnection.start();
- Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue tempQueue = tempSession.createTemporaryQueue();
-
- Connection testConnection = fac.createConnection();
- long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
- Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer testProducer = testSession.createProducer(tempQueue);
- byte[] payload = new byte[1024 * 4];
- for (int i = 0; i < NUMBER; i++) {
- BytesMessage msg = testSession.createBytesMessage();
- msg.writeBytes(payload);
- testProducer.send(msg);
- }
- long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
- assertFalse(startUsage == endUsage);
- tempConnection.close();
- Thread.sleep(1000);
- endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
- assertEquals(startUsage, endUsage);
- }
-
- @Override
- protected void setUp() throws Exception {
- // Start an embedded broker up.
- brokerService = new BrokerService();
-
- KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
- adaptor.setEnableJournalDiskSyncs(false);
- File file = new File("target/AMQ2616Test");
- IOHelper.mkdirs(file);
- IOHelper.deleteChildren(file);
- adaptor.setDirectory(file);
- brokerService.setPersistenceAdapter(adaptor);
-
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry pe = new PolicyEntry();
- pe.setMemoryLimit(10 * 1024 * 1024);
- pe.setOptimizedDispatch(true);
- pe.setProducerFlowControl(false);
- pe.setExpireMessagesPeriod(1000);
- pe.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
- policyMap.put(new ActiveMQQueue(">"), pe);
- brokerService.setDestinationPolicy(policyMap);
- brokerService.getSystemUsage().getMemoryUsage().setLimit(20 * 1024 * 1024);
- brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024);
- brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
- brokerService.start();
- brokerService.waitUntilStarted();
-
- connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
-
- new ActiveMQQueue(getName());
- }
-
- @Override
- protected void tearDown() throws Exception {
- // Stop any running threads.
- shutdown.set(true);
- for (Thread t : threads) {
- t.interrupt();
- t.join();
- }
- brokerService.stop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java
deleted file mode 100644
index 61a5d1e..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ2645Test extends EmbeddedBrokerTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2645Test.class);
- private final static String QUEUE_NAME = "test.daroo.q";
-
- public void testWaitForTransportInterruptionProcessingHang() throws Exception {
- final ConnectionFactory fac = new ActiveMQConnectionFactory("failover:(" + this.bindAddress + ")");
- final Connection connection = fac.createConnection();
- try {
- final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue queue = session.createQueue(QUEUE_NAME);
- final MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- connection.start();
-
- producer.send(session.createTextMessage("test"));
-
- final CountDownLatch afterRestart = new CountDownLatch(1);
- final CountDownLatch twoNewMessages = new CountDownLatch(1);
- final CountDownLatch thirdMessageReceived = new CountDownLatch(1);
-
- final MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- afterRestart.await();
-
- final TextMessage txtMsg = (TextMessage) message;
- if (txtMsg.getText().equals("test")) {
- producer.send(session.createTextMessage("test 1"));
- TimeUnit.SECONDS.sleep(5);
- // THIS SECOND send() WILL CAUSE CONSUMER DEADLOCK
- producer.send(session.createTextMessage("test 2"));
- LOG.info("Two new messages produced.");
- twoNewMessages.countDown();
- }
- else if (txtMsg.getText().equals("test 3")) {
- thirdMessageReceived.countDown();
- }
- }
- catch (Exception e) {
- LOG.error(e.toString());
- throw new RuntimeException(e);
- }
- }
- });
-
- LOG.info("Stopping broker....");
- broker.stop();
-
- LOG.info("Creating new broker...");
- broker = createBroker();
- startBroker();
- broker.waitUntilStarted();
-
- afterRestart.countDown();
- assertTrue("Consumer is deadlocked!", twoNewMessages.await(60, TimeUnit.SECONDS));
-
- producer.send(session.createTextMessage("test 3"));
- assertTrue("Consumer got third message after block", thirdMessageReceived.await(60, TimeUnit.SECONDS));
-
- }
- finally {
- broker.stop();
- }
-
- }
-
- @Override
- protected void setUp() throws Exception {
- bindAddress = "tcp://0.0.0.0:61617";
- super.setUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
deleted file mode 100644
index 533b827..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
-import org.junit.After;
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-public class AMQ2736Test {
-
- BrokerService broker;
-
- @Test
- public void testRollbackOnRecover() throws Exception {
- broker = createAndStartBroker(true);
- DefaultIOExceptionHandler ignoreAllExceptionsIOExHandler = new DefaultIOExceptionHandler();
- ignoreAllExceptionsIOExHandler.setIgnoreAllErrors(true);
- broker.setIoExceptionHandler(ignoreAllExceptionsIOExHandler);
-
- ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost?async=false");
- f.setAlwaysSyncSend(true);
- Connection c = f.createConnection();
- c.start();
- Session s = c.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer p = s.createProducer(new ActiveMQQueue("Tx"));
- p.send(s.createTextMessage("aa"));
-
- // kill journal without commit
- KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
- KahaDBStore store = pa.getStore();
-
- assertNotNull("last tx location is present " + store.getInProgressTxLocationRange()[1]);
-
- // test hack, close the journal to ensure no further journal updates when broker stops
- // mimic kill -9 in terms of no normal shutdown sequence
- store.getJournal().close();
- try {
- store.close();
- }
- catch (Exception expectedLotsAsJournalBorked) {
- }
-
- broker.stop();
- broker.waitUntilStopped();
-
- // restart with recovery
- broker = createAndStartBroker(false);
-
- pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
- store = pa.getStore();
-
- // inflight non xa tx should be rolledback on recovery
- assertNull("in progress tx location is present ", store.getInProgressTxLocationRange()[0]);
-
- }
-
- @After
- public void stopBroker() throws Exception {
- if (broker != null) {
- broker.stop();
- }
- }
-
- private BrokerService createAndStartBroker(boolean deleteAll) throws Exception {
- BrokerService broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(deleteAll);
- broker.setUseJmx(false);
- broker.getManagementContext().setCreateConnector(false);
- broker.start();
- return broker;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java
deleted file mode 100644
index 539354c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ2751Test extends EmbeddedBrokerTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2751Test.class);
-
- private static String clientIdPrefix = "consumer";
- private static String queueName = "FOO";
-
- public void testRecoverRedelivery() throws Exception {
-
- final CountDownLatch redelivery = new CountDownLatch(6);
- final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")");
- try {
-
- Connection connection = factory.createConnection();
- String clientId = clientIdPrefix;
- connection.setClientID(clientId);
-
- final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- Queue queue = session.createQueue(queueName);
-
- MessageConsumer consumer = session.createConsumer(queue);
-
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- LOG.info("Got message: " + message.getJMSMessageID());
- if (message.getJMSRedelivered()) {
- LOG.info("It's a redelivery.");
- redelivery.countDown();
- }
- LOG.info("calling recover() on the session to force redelivery.");
- session.recover();
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
-
- System.out.println("Created queue consumer with clientId " + clientId);
- connection.start();
-
- MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage("test"));
-
- assertTrue("we got 6 redeliveries", redelivery.await(20, TimeUnit.SECONDS));
-
- }
- finally {
- broker.stop();
- }
-
- }
-
- @Override
- protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:0";
- super.setUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java
deleted file mode 100644
index 43394dc..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
-import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.usage.SystemUsage;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ2801Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2801Test.class);
-
- private static final String TOPICNAME = "InvalidPendingQueueTest";
- private static final String SELECTOR1 = "JMS_ID" + " = '" + "TEST" + "'";
- private static final String SELECTOR2 = "JMS_ID" + " = '" + "TEST2" + "'";
- private static final String SUBSCRIPTION1 = "InvalidPendingQueueTest_1";
- private static final String SUBSCRIPTION2 = "InvalidPendingQueueTest_2";
- private static final int MSG_COUNT = 2500;
- private Session session1;
- private Connection conn1;
- private Topic topic1;
- private MessageConsumer consumer1;
- private Session session2;
- private Connection conn2;
- private Topic topic2;
- private MessageConsumer consumer2;
- private BrokerService broker;
- private String connectionUri;
-
- @Before
- public void setUp() throws Exception {
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "activemq-data");
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.setAdvisorySupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.addConnector("tcp://localhost:0").setName("Default");
- applyMemoryLimitPolicy(broker);
- broker.start();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- private void applyMemoryLimitPolicy(BrokerService broker) {
- final SystemUsage memoryManager = new SystemUsage();
- memoryManager.getMemoryUsage().setLimit(5818230784L);
- memoryManager.getStoreUsage().setLimit(6442450944L);
- memoryManager.getTempUsage().setLimit(3221225472L);
- broker.setSystemUsage(memoryManager);
-
- final List<PolicyEntry> policyEntries = new ArrayList<>();
- final PolicyEntry entry = new PolicyEntry();
- entry.setQueue(">");
- entry.setProducerFlowControl(false);
- entry.setMemoryLimit(504857608);
- entry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
- policyEntries.add(entry);
-
- final PolicyMap policyMap = new PolicyMap();
- policyMap.setPolicyEntries(policyEntries);
- broker.setDestinationPolicy(policyMap);
- }
-
- @After
- public void tearDown() throws Exception {
- conn1.close();
- conn2.close();
- if (broker != null) {
- broker.stop();
- }
- }
-
- private void produceMessages() throws Exception {
- TopicConnection connection = createConnection();
- TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(TOPICNAME);
- TopicPublisher producer = session.createPublisher(topic);
- connection.start();
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- long tStamp = System.currentTimeMillis();
- BytesMessage message = session2.createBytesMessage();
- for (int i = 1; i <= MSG_COUNT; i++) {
- message.setStringProperty("JMS_ID", "TEST");
- message.setIntProperty("Type", i);
- producer.publish(message);
- if (i % 100 == 0) {
- LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
- tStamp = System.currentTimeMillis();
- }
- }
- }
-
- private void activeateSubscribers() throws Exception {
- // First consumer
- conn1 = createConnection();
- conn1.setClientID(SUBSCRIPTION1);
- session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
- topic1 = session1.createTopic(TOPICNAME);
- consumer1 = session1.createDurableSubscriber(topic1, SUBSCRIPTION1, SELECTOR1, false);
- conn1.start();
-
- // Second consumer that just exists
- conn2 = createConnection();
- conn2.setClientID(SUBSCRIPTION2);
- session2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
- topic2 = session2.createTopic(TOPICNAME);
- consumer2 = session2.createDurableSubscriber(topic2, SUBSCRIPTION2, SELECTOR2, false);
- conn2.start();
- }
-
- @Test
- public void testInvalidPendingQueue() throws Exception {
-
- activeateSubscribers();
-
- assertNotNull(consumer1);
- assertNotNull(consumer2);
-
- produceMessages();
- LOG.debug("Sent messages to a single subscriber");
- Thread.sleep(2000);
-
- LOG.debug("Closing durable subscriber connections");
- conn1.close();
- conn2.close();
- LOG.debug("Closed durable subscriber connections");
-
- Thread.sleep(2000);
- LOG.debug("Re-starting durable subscriber connections");
-
- activeateSubscribers();
- LOG.debug("Started up durable subscriber connections - now view activemq console to see pending queue size on the other subscriber");
-
- ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
-
- for (int i = 0; i < subs.length; i++) {
- ObjectName subName = subs[i];
- DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
-
- LOG.info(sub.getSubscriptionName() + ": pending = " + sub.getPendingQueueSize() + ", dispatched: " + sub.getDispatchedQueueSize());
- if (sub.getSubscriptionName().equals(SUBSCRIPTION1)) {
- assertEquals("Incorrect number of pending messages", MSG_COUNT, sub.getPendingQueueSize() + sub.getDispatchedQueueSize());
- }
- else {
- assertEquals("Incorrect number of pending messages", 0, sub.getPendingQueueSize());
- }
- }
- }
-
- private TopicConnection createConnection() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
- connectionFactory.setBrokerURL(connectionUri);
- TopicConnection conn = connectionFactory.createTopicConnection();
- return conn;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
deleted file mode 100644
index f089941..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.leveldb.LevelDBStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.disk.journal.DataFile;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ2832Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class);
-
- BrokerService broker = null;
- private ActiveMQConnectionFactory cf;
- private final Destination destination = new ActiveMQQueue("AMQ2832Test");
- private String connectionUri;
-
- protected void startBroker() throws Exception {
- doStartBroker(true, false);
- }
-
- protected void restartBroker() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- doStartBroker(false, false);
- }
-
- protected void recoverBroker() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- doStartBroker(false, true);
- }
-
- private void doStartBroker(boolean delete, boolean recover) throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(delete);
- broker.setPersistent(true);
- broker.setUseJmx(true);
- broker.addConnector("tcp://localhost:0");
-
- configurePersistence(broker, recover);
-
- connectionUri = "vm://localhost?create=false";
- cf = new ActiveMQConnectionFactory(connectionUri);
-
- broker.start();
- LOG.info("Starting broker..");
- }
-
- protected void configurePersistence(BrokerService brokerService, boolean recover) throws Exception {
- KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
-
- // ensure there are a bunch of data files but multiple entries in each
- adapter.setJournalMaxFileLength(1024 * 20);
-
- // speed up the test case, checkpoint and cleanup early and often
- adapter.setCheckpointInterval(5000);
- adapter.setCleanupInterval(5000);
-
- if (recover) {
- adapter.setForceRecoverIndex(true);
- }
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- /**
- * Scenario:
- * db-1.log has an unacknowledged message,
- * db-2.log contains acks for the messages from db-1.log,
- * db-3.log contains acks for the messages from db-2.log
- *
- * Expected behavior: since db-1.log is blocked, db-2.log and db-3.log should not be removed during the cleanup.
- * Current situation on 5.10.0, 5.10.1 is that db-3.log is removed causing all messages from db-2.log, whose acks were in db-3.log, to be replayed.
- *
- * @throws Exception
- */
- @Test
- public void testAckChain() throws Exception {
- startBroker();
-
- StagedConsumer consumer = new StagedConsumer();
- // file #1
- produceMessagesToConsumeMultipleDataFiles(5);
- // acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log
- consumer.receive(3);
-
- // send messages by consuming and acknowledging every message right after sent in order to get KahadbAdd and Remove command to be saved together
- // this is necessary in order to get KahaAddMessageCommand to be saved in one db file and the corresponding KahaRemoveMessageCommand in the next one
- produceAndConsumeImmediately(20, consumer);
- consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed
-
- // now we have 3 files written and started with #4
- consumer.close();
-
- broker.stop();
- broker.waitUntilStopped();
-
- recoverBroker();
-
- consumer = new StagedConsumer();
- Message message = consumer.receive(1);
- assertNotNull("One message stays unacked from db-1.log", message);
- message.acknowledge();
- message = consumer.receive(1);
- assertNull("There should not be any unconsumed messages any more", message);
- consumer.close();
- }
-
- private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception {
- for (int i = 0; i < numOfMsgs; i++) {
- produceMessagesToConsumeMultipleDataFiles(1);
- consumer.receive(1).acknowledge();
- }
- }
-
- @Test
- public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
-
- startBroker();
-
- StagedConsumer consumer = new StagedConsumer();
- int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20);
- // this will block the reclaiming of one data file
- Message firstUnacked = consumer.receive(10);
- LOG.info("first unacked: " + firstUnacked.getJMSMessageID());
- Message secondUnacked = consumer.receive(1);
- LOG.info("second unacked: " + secondUnacked.getJMSMessageID());
- numMessagesAvailable -= 11;
-
- numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10);
- // ensure ack is another data file
- LOG.info("Acking firstUnacked: " + firstUnacked.getJMSMessageID());
- firstUnacked.acknowledge();
-
- numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10);
-
- consumer.receive(numMessagesAvailable).acknowledge();
-
- // second unacked should keep first data file available but journal with the first ack
- // may get whacked
- consumer.close();
-
- broker.stop();
- broker.waitUntilStopped();
-
- recoverBroker();
-
- consumer = new StagedConsumer();
- // need to force recovery?
-
- Message msg = consumer.receive(1, 5);
- assertNotNull("One messages left after recovery", msg);
- msg.acknowledge();
-
- // should be no more messages
- msg = consumer.receive(1, 5);
- assertEquals("Only one messages left after recovery: " + msg, null, msg);
- consumer.close();
- }
-
- @Test
- public void testAlternateLossScenario() throws Exception {
-
- startBroker();
- PersistenceAdapter pa = broker.getPersistenceAdapter();
- if (pa instanceof LevelDBStore) {
- return;
- }
-
- ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
- ActiveMQQueue disposable = new ActiveMQQueue("MyDisposableQueue");
- ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
-
- // This ensure that data file 1 never goes away.
- createInactiveDurableSub(topic);
- assertEquals(1, getNumberOfJournalFiles());
-
- // One Queue Message that will be acked in another data file.
- produceMessages(queue, 1);
- assertEquals(1, getNumberOfJournalFiles());
-
- // Add some messages to consume space
- produceMessages(disposable, 50);
-
- int dataFilesCount = getNumberOfJournalFiles();
- assertTrue(dataFilesCount > 1);
-
- // Create an ack for the single message on this queue
- drainQueue(queue);
-
- // Add some more messages to consume space beyond tha data file with the ack
- produceMessages(disposable, 50);
-
- assertTrue(dataFilesCount < getNumberOfJournalFiles());
- dataFilesCount = getNumberOfJournalFiles();
-
- restartBroker();
-
- // Clear out all queue data
- broker.getAdminView().removeQueue(disposable.getQueueName());
-
- // Once this becomes true our ack could be lost.
- assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return getNumberOfJournalFiles() <= 3;
- }
- }, TimeUnit.MINUTES.toMillis(3)));
-
- // Recover and the Message should not be replayed but if the old MessageAck is lost
- // then it could be.
- recoverBroker();
-
- assertTrue(drainQueue(queue) == 0);
- }
-
- private int getNumberOfJournalFiles() throws IOException {
-
- Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
- int reality = 0;
- for (DataFile file : files) {
- if (file != null) {
- reality++;
- }
- }
-
- return reality;
- }
-
- private void createInactiveDurableSub(Topic topic) throws Exception {
- Connection connection = cf.createConnection();
- connection.setClientID("Inactive");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
- consumer.close();
- connection.close();
- produceMessages(topic, 1);
- }
-
- private int drainQueue(Queue queue) throws Exception {
- Connection connection = cf.createConnection();
- connection.setClientID("Inactive");
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(queue);
- int count = 0;
- while (consumer.receive(5000) != null) {
- count++;
- }
- consumer.close();
- connection.close();
- return count;
- }
-
- private int produceMessages(Destination destination, int numToSend) throws Exception {
- int sent = 0;
- Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
- connection.start();
- try {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < numToSend; i++) {
- producer.send(createMessage(session, i));
- sent++;
- }
- }
- finally {
- connection.close();
- }
-
- return sent;
- }
-
- private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
- return produceMessages(destination, numToSend);
- }
-
- final String payload = new String(new byte[1024]);
-
- private Message createMessage(Session session, int i) throws Exception {
- return session.createTextMessage(payload + "::" + i);
- }
-
- private class StagedConsumer {
-
- Connection connection;
- MessageConsumer consumer;
-
- StagedConsumer() throws Exception {
- connection = new ActiveMQConnectionFactory("failover://" + broker.getTransportConnectors().get(0).getConnectUri().toString()).createConnection();
- connection.start();
- consumer = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE).createConsumer(destination);
- }
-
- public Message receive(int numToReceive) throws Exception {
- return receive(numToReceive, 2);
- }
-
- public Message receive(int numToReceive, int timeoutInSeconds) throws Exception {
- Message msg = null;
- for (; numToReceive > 0; numToReceive--) {
-
- do {
- msg = consumer.receive(1 * 1000);
- } while (msg == null && --timeoutInSeconds > 0);
-
- if (numToReceive > 1) {
- msg.acknowledge();
- }
-
- if (msg != null) {
- LOG.debug("received: " + msg.getJMSMessageID());
- }
- }
- // last message, unacked
- return msg;
- }
-
- void close() throws JMSException {
- consumer.close();
- connection.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java
deleted file mode 100644
index b4f0a33..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(value = Parameterized.class)
-public class AMQ2870Test extends org.apache.activemq.TestSupport {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ2870Test.class);
- BrokerService broker = null;
- ActiveMQTopic topic;
-
- ActiveMQConnection consumerConnection = null, producerConnection = null;
- Session producerSession;
- MessageProducer producer;
- final int minPercentUsageForStore = 10;
- String data;
-
- private final PersistenceAdapterChoice persistenceAdapterChoice;
-
- @Parameterized.Parameters
- public static Collection<PersistenceAdapterChoice[]> getTestParameters() {
- String osName = System.getProperty("os.name");
- LOG.info("Running on [" + osName + "]");
- PersistenceAdapterChoice[] kahaDb = {PersistenceAdapterChoice.KahaDB};
- PersistenceAdapterChoice[] levelDb = {PersistenceAdapterChoice.LevelDB};
- List<PersistenceAdapterChoice[]> choices = new ArrayList<>();
- choices.add(kahaDb);
- if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
- choices.add(levelDb);
- }
-
- return choices;
- }
-
- public AMQ2870Test(PersistenceAdapterChoice choice) {
- this.persistenceAdapterChoice = choice;
- }
-
- @Test(timeout = 300000)
- public void testSize() throws Exception {
- openConsumer();
-
- assertEquals(0, broker.getAdminView().getStorePercentUsage());
-
- for (int i = 0; i < 5000; i++) {
- sendMessage(false);
- }
-
- final BrokerView brokerView = broker.getAdminView();
-
- // wait for reclaim
- assertTrue("in range with consumer", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- // usage percent updated only on send check for isFull so once
- // sends complete it is no longer updated till next send via a call to isFull
- // this is optimal as it is only used to block producers
- broker.getSystemUsage().getStoreUsage().isFull();
- LOG.info("store percent usage: " + brokerView.getStorePercentUsage());
- return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore;
- }
- }));
-
- closeConsumer();
-
- assertTrue("in range with closed consumer", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- broker.getSystemUsage().getStoreUsage().isFull();
- LOG.info("store precent usage: " + brokerView.getStorePercentUsage());
- return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore;
- }
- }));
-
- for (int i = 0; i < 5000; i++) {
- sendMessage(false);
- }
-
- // What if i drop the subscription?
- broker.getAdminView().destroyDurableSubscriber("cliID", "subName");
-
- assertTrue("in range after send with consumer", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- broker.getSystemUsage().getStoreUsage().isFull();
- LOG.info("store precent usage: " + brokerView.getStorePercentUsage());
- return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore;
- }
- }));
- }
-
- private void openConsumer() throws Exception {
- consumerConnection = (ActiveMQConnection) createConnection();
- consumerConnection.setClientID("cliID");
- consumerConnection.start();
- Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subName", "filter=true", false);
-
- subscriber.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- // received++;
- }
- });
- }
-
- private void closeConsumer() throws JMSException {
- if (consumerConnection != null)
- consumerConnection.close();
- consumerConnection = null;
- }
-
- private void sendMessage(boolean filter) throws Exception {
- if (producerConnection == null) {
- producerConnection = (ActiveMQConnection) createConnection();
- producerConnection.start();
- producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = producerSession.createProducer(topic);
- }
-
- Message message = producerSession.createMessage();
- message.setBooleanProperty("filter", filter);
- message.setStringProperty("data", data);
- producer.send(message);
- }
-
- private void startBroker(boolean deleteMessages) throws Exception {
- broker = new BrokerService();
- broker.setAdvisorySupport(false);
- broker.setBrokerName("testStoreSize");
-
- if (deleteMessages) {
- broker.setDeleteAllMessagesOnStartup(true);
- }
- LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString());
- setPersistenceAdapter(broker, persistenceAdapterChoice);
- configurePersistenceAdapter(broker.getPersistenceAdapter());
- broker.getSystemUsage().getStoreUsage().setLimit(100 * 1000 * 1000);
- broker.start();
- }
-
- private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) {
- Properties properties = new Properties();
- String maxFileLengthVal = String.valueOf(2 * 1024 * 1024);
- properties.put("journalMaxFileLength", maxFileLengthVal);
- properties.put("maxFileLength", maxFileLengthVal);
- properties.put("cleanupInterval", "2000");
- properties.put("checkpointInterval", "2000");
-
- // leveldb
- properties.put("logSize", maxFileLengthVal);
-
- IntrospectionSupport.setProperties(persistenceAdapter, properties);
- }
-
- private void stopBroker() throws Exception {
- if (broker != null)
- broker.stop();
- broker = null;
- }
-
- @Override
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&waitForStart=5000&create=false");
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- StringBuilder sb = new StringBuilder(5000);
- for (int i = 0; i < 5000; i++) {
- sb.append('a');
- }
- data = sb.toString();
-
- startBroker(true);
- topic = (ActiveMQTopic) createDestination();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- stopBroker();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bcfd089/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
deleted file mode 100644
index 798d32f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.TransportConnection;
-import org.apache.activemq.transport.TransportDisposedIOException;
-import org.apache.activemq.util.DefaultTestAppender;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.spi.LoggingEvent;
-import org.slf4j.LoggerFactory;
-
-public class AMQ2902Test extends TestCase {
-
- private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(AMQ2580Test.class);
-
- final AtomicBoolean gotExceptionInLog = new AtomicBoolean(Boolean.FALSE);
- final AtomicBoolean failedToFindMDC = new AtomicBoolean(Boolean.FALSE);
-
- Appender appender = new DefaultTestAppender() {
- @Override
- public void doAppend(LoggingEvent event) {
- if (event.getThrowableInformation() != null && event.getThrowableInformation().getThrowable() instanceof TransportDisposedIOException) {
-
- // Prevent StackOverflowException so we can see a sane stack trace.
- if (gotExceptionInLog.get()) {
- return;
- }
-
- gotExceptionInLog.set(Boolean.TRUE);
- LOG.error("got event: " + event + ", ex:" + event.getThrowableInformation().getThrowable(), event.getThrowableInformation().getThrowable());
- LOG.error("Event source: ", new Throwable("Here"));
- }
- if (!"Loaded the Bouncy Castle security provider.".equals(event.getMessage())) {
- if (event.getMDC("activemq.broker") == null) {
- failedToFindMDC.set(Boolean.TRUE);
- }
- }
- return;
- }
- };
-
- public void testNoExceptionOnClosewithStartStop() throws JMSException {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
- Connection connection = connectionFactory.createConnection();
- connection.start();
- connection.stop();
- connection.close();
- }
-
- public void testNoExceptionOnClose() throws JMSException {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
- Connection connection = connectionFactory.createConnection();
- connection.close();
- }
-
- @Override
- public void setUp() throws Exception {
- gotExceptionInLog.set(Boolean.FALSE);
- failedToFindMDC.set(Boolean.FALSE);
- Logger.getRootLogger().addAppender(appender);
- Logger.getLogger(TransportConnection.class.getName() + ".Transport").setLevel(Level.DEBUG);
- Logger.getLogger(TransportConnection.class.getName()).setLevel(Level.DEBUG);
- }
-
- @Override
- public void tearDown() throws Exception {
- Logger.getRootLogger().removeAppender(appender);
- assertFalse("got unexpected ex in log on graceful close", gotExceptionInLog.get());
- assertFalse("MDC is there", failedToFindMDC.get());
- }
-}