You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/31 04:30:56 UTC
[14/69] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
deleted file mode 100644
index 98fc79b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
+++ /dev/null
@@ -1,617 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Non transactional concurrent producer/consumer to single dest
- */
-@RunWith(Parameterized.class)
-public class AMQ5266SingleDestTest {
-
- static Logger LOG = LoggerFactory.getLogger(AMQ5266SingleDestTest.class);
- String activemqURL;
- BrokerService brokerService;
-
- public int numDests = 1;
- public int messageSize = 10 * 1000;
-
- @Parameterized.Parameter(0)
- public int publisherMessagesPerThread = 1000;
-
- @Parameterized.Parameter(1)
- public int publisherThreadCount = 20;
-
- @Parameterized.Parameter(2)
- public int consumerThreadsPerQueue = 5;
-
- @Parameterized.Parameter(3)
- public int destMemoryLimit = 50 * 1024;
-
- @Parameterized.Parameter(4)
- public boolean useCache = true;
-
- @Parameterized.Parameter(5)
- public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
-
- @Parameterized.Parameter(6)
- public boolean optimizeDispatch = false;
-
- @Parameterized.Parameters(name = "#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
- public static Iterable<Object[]> parameters() {
- return Arrays.asList(new Object[][]{{1000, 40, 40, 1024 * 1024 * 1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 40, 40, 1024 * 1024 * 1, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 40, 40, 1024 * 1024 * 1, true, TestSupport.PersistenceAdapterChoice.JDBC, false},});
- }
-
- public int consumerBatchSize = 25;
-
- @BeforeClass
- public static void derbyTestMode() throws Exception {
- System.setProperty("derby.system.durability", "test");
- }
-
- @Before
- public void startBroker() throws Exception {
- brokerService = new BrokerService();
-
- TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
- brokerService.setDeleteAllMessagesOnStartup(true);
- brokerService.setUseJmx(false);
- brokerService.setAdvisorySupport(false);
-
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
- defaultEntry.setMaxProducersToAudit(publisherThreadCount);
- defaultEntry.setEnableAudit(true);
- defaultEntry.setUseCache(useCache);
- defaultEntry.setMaxPageSize(1000);
- defaultEntry.setOptimizedDispatch(optimizeDispatch);
- defaultEntry.setMemoryLimit(destMemoryLimit);
- defaultEntry.setExpireMessagesPeriod(0);
- policyMap.setDefaultEntry(defaultEntry);
- brokerService.setDestinationPolicy(policyMap);
-
- brokerService.getSystemUsage().getMemoryUsage().setLimit(64 * 1024 * 1024);
-
- TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
- brokerService.start();
- activemqURL = transportConnector.getPublishableConnectString();
- activemqURL += "?jms.watchTopicAdvisories=false"; // ensure all messages are queue or dlq messages
- }
-
- @After
- public void stopBroker() throws Exception {
- if (brokerService != null) {
- brokerService.stop();
- }
- }
-
- @Test
- public void test() throws Exception {
-
- String activemqQueues = "activemq";
- for (int i = 1; i < numDests; i++) {
- activemqQueues += ",activemq" + i;
- }
-
- int consumerWaitForConsumption = 5 * 60 * 1000;
-
- ExportQueuePublisher publisher = null;
- ExportQueueConsumer consumer = null;
-
- LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified.");
- LOG.info("\nBuilding Publisher...");
-
- publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
-
- LOG.info("Building Consumer...");
-
- consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
-
- long totalStart = System.currentTimeMillis();
-
- LOG.info("Starting Publisher...");
-
- publisher.start();
-
- LOG.info("Starting Consumer...");
-
- consumer.start();
-
- int distinctPublishedCount = 0;
-
- LOG.info("Waiting For Publisher Completion...");
-
- publisher.waitForCompletion();
-
- List<String> publishedIds = publisher.getIDs();
- distinctPublishedCount = new TreeSet<>(publishedIds).size();
-
- LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
- LOG.info("Publisher duration: {}", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart));
-
- long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
- while (!consumer.completed() && System.currentTimeMillis() < endWait) {
- try {
- int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
- LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
- Thread.sleep(1000);
- }
- catch (Exception e) {
- }
- }
-
- LOG.info("\nConsumer Complete: " + consumer.completed() + ", Shutting Down.");
-
- LOG.info("Total duration: {}", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - totalStart));
-
- consumer.shutdown();
-
- TimeUnit.SECONDS.sleep(2);
-
- LOG.info("Consumer Stats:");
-
- for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
-
- List<String> idList = entry.getValue();
-
- int distinctConsumed = new TreeSet<>(idList).size();
-
- StringBuilder sb = new StringBuilder();
- sb.append(" Queue: " + entry.getKey() +
- " -> Total Messages Consumed: " + idList.size() +
- ", Distinct IDs Consumed: " + distinctConsumed);
-
- int diff = distinctPublishedCount - distinctConsumed;
- sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) ");
- LOG.info(sb.toString());
-
- assertEquals("expect to get all messages!", 0, diff);
-
- }
-
- // verify empty dlq
- assertEquals("No pending messages", 0L, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
- }
-
- public class ExportQueuePublisher {
-
- private final String amqUser = ActiveMQConnection.DEFAULT_USER;
- private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
- private ActiveMQConnectionFactory connectionFactory = null;
- private String activemqURL = null;
- private String activemqQueues = null;
- // Collection of distinct IDs that the publisher has published.
- // After a message is published, its UUID will be written to this list for tracking.
- // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs.
- //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>());
- private List<String> ids = Collections.synchronizedList(new ArrayList<String>());
- private List<PublisherThread> threads;
-
- public ExportQueuePublisher(String activemqURL,
- String activemqQueues,
- int messagesPerThread,
- int threadCount) throws Exception {
-
- this.activemqURL = activemqURL;
- this.activemqQueues = activemqQueues;
-
- threads = new ArrayList<>();
-
- // Build the threads and tell them how many messages to publish
- for (int i = 0; i < threadCount; i++) {
- PublisherThread pt = new PublisherThread(messagesPerThread);
- threads.add(pt);
- }
- }
-
- public List<String> getIDs() {
- return ids;
- }
-
- // Kick off threads
- public void start() throws Exception {
-
- for (PublisherThread pt : threads) {
- pt.start();
- }
- }
-
- // Wait for threads to complete. They will complete once they've published all of their messages.
- public void waitForCompletion() throws Exception {
-
- for (PublisherThread pt : threads) {
- pt.join();
- pt.close();
- }
- }
-
- private Session newSession(QueueConnection queueConnection) throws Exception {
- return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- private synchronized QueueConnection newQueueConnection() throws Exception {
-
- if (connectionFactory == null) {
- connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
- }
-
- // Set the redelivery count to -1 (infinite), or else messages will start dropping
- // after the queue has had a certain number of failures (default is 6)
- RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
- policy.setMaximumRedeliveries(-1);
-
- QueueConnection amqConnection = connectionFactory.createQueueConnection();
- amqConnection.start();
- return amqConnection;
- }
-
- private class PublisherThread extends Thread {
-
- private int count;
- private QueueConnection qc;
- private Session session;
- private MessageProducer mp;
-
- private PublisherThread(int count) throws Exception {
-
- this.count = count;
-
- // Each Thread has its own Connection and Session, so no sync worries
- qc = newQueueConnection();
- session = newSession(qc);
-
- // In our code, when publishing to multiple queues,
- // we're using composite destinations like below
- Queue q = new ActiveMQQueue(activemqQueues);
- mp = session.createProducer(q);
- }
-
- @Override
- public void run() {
-
- try {
-
- // Loop until we've published enough messages
- while (count-- > 0) {
-
- TextMessage tm = session.createTextMessage(getMessageText());
- String id = UUID.randomUUID().toString();
- tm.setStringProperty("KEY", id);
- ids.add(id); // keep track of the key to compare against consumer
-
- mp.send(tm);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- // Called by waitForCompletion
- public void close() {
-
- try {
- mp.close();
- }
- catch (Exception e) {
- }
-
- try {
- session.close();
- }
- catch (Exception e) {
- }
-
- try {
- qc.close();
- }
- catch (Exception e) {
- }
- }
- }
-
- }
-
- String messageText;
-
- private String getMessageText() {
-
- if (messageText == null) {
-
- synchronized (this) {
-
- if (messageText == null) {
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < messageSize; i++) {
- sb.append("X");
- }
- messageText = sb.toString();
- }
- }
- }
-
- return messageText;
- }
-
- public class ExportQueueConsumer {
-
- private final String amqUser = ActiveMQConnection.DEFAULT_USER;
- private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
- private final int totalToExpect;
- private ActiveMQConnectionFactory connectionFactory = null;
- private String activemqURL = null;
- private String activemqQueues = null;
- private String[] queues = null;
- // Map of IDs that were consumed, keyed by queue name.
- // We'll compare these against what was published to know if any got stuck or dropped.
- private Map<String, List<String>> idsByQueue = new HashMap<>();
- private Map<String, List<ConsumerThread>> threads;
-
- public ExportQueueConsumer(String activemqURL,
- String activemqQueues,
- int threadsPerQueue,
- int batchSize,
- int totalToExpect) throws Exception {
-
- this.activemqURL = activemqURL;
- this.activemqQueues = activemqQueues;
- this.totalToExpect = totalToExpect;
-
- queues = this.activemqQueues.split(",");
-
- for (int i = 0; i < queues.length; i++) {
- queues[i] = queues[i].trim();
- }
-
- threads = new HashMap<>();
-
- // For each queue, create a list of threads and set up the list of ids
- for (String q : queues) {
-
- List<ConsumerThread> list = new ArrayList<>();
-
- idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>()));
-
- for (int i = 0; i < threadsPerQueue; i++) {
- list.add(new ConsumerThread(q, batchSize));
- }
-
- threads.put(q, list);
- }
- }
-
- public Map<String, List<String>> getIDs() {
- return idsByQueue;
- }
-
- // Start the threads
- public void start() throws Exception {
-
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- ct.start();
- }
- }
- }
-
- // Tell the threads to stop
- // Then wait for them to stop
- public void shutdown() throws Exception {
-
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- ct.shutdown();
- }
- }
-
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- ct.join();
- }
- }
- }
-
- private Session newSession(QueueConnection queueConnection) throws Exception {
- return queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- private synchronized QueueConnection newQueueConnection() throws Exception {
-
- if (connectionFactory == null) {
- connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
- }
-
- // Set the redelivery count to -1 (infinite), or else messages will start dropping
- // after the queue has had a certain number of failures (default is 6)
- RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
- policy.setMaximumRedeliveries(-1);
-
- QueueConnection amqConnection = connectionFactory.createQueueConnection();
- amqConnection.start();
- return amqConnection;
- }
-
- public boolean completed() {
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- if (ct.isAlive()) {
- LOG.info("thread for {} is still alive.", ct.qName);
- return false;
- }
- }
- }
- return true;
- }
-
- private class ConsumerThread extends Thread {
-
- private int batchSize;
- private QueueConnection qc;
- private Session session;
- private MessageConsumer mc;
- private List<String> idList;
- private boolean shutdown = false;
- private String qName;
-
- private ConsumerThread(String queueName, int batchSize) throws Exception {
-
- this.batchSize = batchSize;
-
- // Each thread has its own connection and session
- qName = queueName;
- qc = newQueueConnection();
- session = newSession(qc);
- Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
- mc = session.createConsumer(q);
-
- idList = idsByQueue.get(queueName);
- }
-
- @Override
- public void run() {
-
- try {
-
- int count = 0;
-
- // Keep reading as long as it hasn't been told to shutdown
- while (!shutdown) {
-
- if (idList.size() >= totalToExpect) {
- LOG.info("Got {} for q: {}", +idList.size(), qName);
- break;
- }
- Message m = mc.receive(4000);
-
- if (m != null) {
-
- // We received a non-null message, add the ID to our list
-
- idList.add(m.getStringProperty("KEY"));
-
- count++;
-
- // If we've reached our batch size, commit the batch and reset the count
-
- if (count == batchSize) {
- count = 0;
- }
- }
- else {
-
- // We didn't receive anything this time, commit any current batch and reset the count
-
- count = 0;
-
- // Sleep a little before trying to read after not getting a message
-
- try {
- if (idList.size() < totalToExpect) {
- LOG.info("did not receive on {}, current count: {}", qName, idList.size());
- }
- //sleep(3000);
- }
- catch (Exception e) {
- }
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- finally {
-
- // Once we exit, close everything
- close();
- }
- }
-
- public void shutdown() {
- shutdown = true;
- }
-
- public void close() {
-
- try {
- mc.close();
- }
- catch (Exception e) {
- }
-
- try {
- session.close();
- }
- catch (Exception e) {
- }
-
- try {
- qc.close();
- }
- catch (Exception e) {
-
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
deleted file mode 100644
index c7bc6d2..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
+++ /dev/null
@@ -1,628 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-
-/*
- * pause producers if consumers stall and verify broker drained before resume
- */
-@RunWith(Parameterized.class)
-public class AMQ5266StarvedConsumerTest {
-
- static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class);
- String activemqURL;
- BrokerService brokerService;
-
- public int messageSize = 1000;
-
- @Parameterized.Parameter(0)
- public int publisherMessagesPerThread = 1000;
-
- @Parameterized.Parameter(1)
- public int publisherThreadCount = 20;
-
- @Parameterized.Parameter(2)
- public int consumerThreadsPerQueue = 5;
-
- @Parameterized.Parameter(3)
- public int destMemoryLimit = 50 * 1024;
-
- @Parameterized.Parameter(4)
- public boolean useCache = true;
-
- @Parameterized.Parameter(5)
- public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
-
- @Parameterized.Parameter(6)
- public boolean optimizeDispatch = false;
- private AtomicBoolean didNotReceive = new AtomicBoolean(false);
-
- @Parameterized.Parameters(name = "#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
- public static Iterable<Object[]> parameters() {
- return Arrays.asList(new Object[][]{{1000, 40, 5, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, {1000, 40, 5, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, {1000, 40, 5, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},
-
- {500, 20, 20, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, {500, 20, 20, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, {500, 20, 20, 1024 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true},});
- }
-
- public int consumerBatchSize = 5;
-
- @Before
- public void startBroker() throws Exception {
- brokerService = new BrokerService();
- TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
- brokerService.setDeleteAllMessagesOnStartup(true);
- brokerService.setUseJmx(false);
- brokerService.setAdvisorySupport(false);
-
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
- defaultEntry.setMaxAuditDepth(publisherThreadCount);
- defaultEntry.setEnableAudit(true);
- defaultEntry.setUseCache(useCache);
- defaultEntry.setMaxPageSize(1000);
- defaultEntry.setOptimizedDispatch(optimizeDispatch);
- defaultEntry.setMemoryLimit(destMemoryLimit);
- defaultEntry.setExpireMessagesPeriod(0);
- policyMap.setDefaultEntry(defaultEntry);
- brokerService.setDestinationPolicy(policyMap);
-
- brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
-
- TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
- brokerService.start();
- activemqURL = transportConnector.getPublishableConnectString();
- }
-
- @After
- public void stopBroker() throws Exception {
- if (brokerService != null) {
- brokerService.stop();
- }
- }
-
- CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() {
- @Override
- public void run() {
- // wait for queue size to go to zero
- try {
- while (((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount() > 0) {
- LOG.info("Total messageCount: " + ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
- TimeUnit.SECONDS.sleep(5);
- }
- }
- catch (Exception ignored) {
- ignored.printStackTrace();
- }
- }
- });
-
- @Test(timeout = 30 * 60 * 1000)
- public void test() throws Exception {
-
- String activemqQueues = "activemq,activemq2,activemq3,activemq4";//,activemq5,activemq6,activemq7,activemq8,activemq9";
-
- int consumerWaitForConsumption = 5 * 60 * 1000;
-
- ExportQueuePublisher publisher = null;
- ExportQueueConsumer consumer = null;
-
- LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified.");
- LOG.info("\nBuilding Publisher...");
-
- publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
-
- LOG.info("Building Consumer...");
-
- consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
-
- LOG.info("Starting Publisher...");
-
- publisher.start();
-
- LOG.info("Starting Consumer...");
-
- consumer.start();
-
- int distinctPublishedCount = 0;
-
- LOG.info("Waiting For Publisher Completion...");
-
- publisher.waitForCompletion();
-
- List<String> publishedIds = publisher.getIDs();
- distinctPublishedCount = new TreeSet<>(publishedIds).size();
-
- LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
-
- long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
- while (!consumer.completed() && System.currentTimeMillis() < endWait) {
- try {
- int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
- LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
- Thread.sleep(10000);
- }
- catch (Exception e) {
- }
- }
-
- LOG.info("\nConsumer Complete: " + consumer.completed() + ", Shutting Down.");
-
- consumer.shutdown();
-
- LOG.info("Consumer Stats:");
-
- for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
-
- List<String> idList = entry.getValue();
-
- int distinctConsumed = new TreeSet<>(idList).size();
-
- StringBuilder sb = new StringBuilder();
- sb.append(" Queue: " + entry.getKey() +
- " -> Total Messages Consumed: " + idList.size() +
- ", Distinct IDs Consumed: " + distinctConsumed);
-
- int diff = distinctPublishedCount - distinctConsumed;
- sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) ");
- LOG.info(sb.toString());
-
- assertEquals("expect to get all messages!", 0, diff);
-
- }
- }
-
- public class ExportQueuePublisher {
-
- private final String amqUser = ActiveMQConnection.DEFAULT_USER;
- private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
- private ActiveMQConnectionFactory connectionFactory = null;
- private String activemqURL = null;
- private String activemqQueues = null;
- // Collection of distinct IDs that the publisher has published.
- // After a message is published, its UUID will be written to this list for tracking.
- // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs.
- //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>());
- private List<String> ids = Collections.synchronizedList(new ArrayList<String>());
- private List<PublisherThread> threads;
-
- public ExportQueuePublisher(String activemqURL,
- String activemqQueues,
- int messagesPerThread,
- int threadCount) throws Exception {
-
- this.activemqURL = activemqURL;
- this.activemqQueues = activemqQueues;
-
- threads = new ArrayList<>();
-
- // Build the threads and tell them how many messages to publish
- for (int i = 0; i < threadCount; i++) {
- PublisherThread pt = new PublisherThread(messagesPerThread);
- threads.add(pt);
- }
- }
-
- public List<String> getIDs() {
- return ids;
- }
-
- // Kick off threads
- public void start() throws Exception {
-
- for (PublisherThread pt : threads) {
- pt.start();
- }
- }
-
- // Wait for threads to complete. They will complete once they've published all of their messages.
- public void waitForCompletion() throws Exception {
-
- for (PublisherThread pt : threads) {
- pt.join();
- pt.close();
- }
- }
-
- private Session newSession(QueueConnection queueConnection) throws Exception {
- return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
- }
-
- private synchronized QueueConnection newQueueConnection() throws Exception {
-
- if (connectionFactory == null) {
- connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
- connectionFactory.setWatchTopicAdvisories(false);
- }
-
- // Set the redelivery count to -1 (infinite), or else messages will start dropping
- // after the queue has had a certain number of failures (default is 6)
- RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
- policy.setMaximumRedeliveries(-1);
-
- QueueConnection amqConnection = connectionFactory.createQueueConnection();
- amqConnection.start();
- return amqConnection;
- }
-
- private class PublisherThread extends Thread {
-
- private int count;
- private QueueConnection qc;
- private Session session;
- private MessageProducer mp;
- private Queue q;
-
- private PublisherThread(int count) throws Exception {
-
- this.count = count;
-
- // Each Thread has its own Connection and Session, so no sync worries
- qc = newQueueConnection();
- session = newSession(qc);
-
- // In our code, when publishing to multiple queues,
- // we're using composite destinations like below
- q = new ActiveMQQueue(activemqQueues);
- mp = session.createProducer(null);
- }
-
- @Override
- public void run() {
-
- try {
-
- // Loop until we've published enough messages
- while (count-- > 0) {
-
- TextMessage tm = session.createTextMessage(getMessageText());
- String id = UUID.randomUUID().toString();
- tm.setStringProperty("KEY", id);
- ids.add(id); // keep track of the key to compare against consumer
-
- mp.send(q, tm);
- session.commit();
-
- if (didNotReceive.get()) {
- globalProducerHalt.await();
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- // Called by waitForCompletion
- public void close() {
-
- try {
- mp.close();
- }
- catch (Exception e) {
- }
-
- try {
- session.close();
- }
- catch (Exception e) {
- }
-
- try {
- qc.close();
- }
- catch (Exception e) {
- }
- }
- }
-
- }
-
- String messageText;
-
- private String getMessageText() {
-
- if (messageText == null) {
-
- synchronized (this) {
-
- if (messageText == null) {
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < messageSize; i++) {
- sb.append("X");
- }
- messageText = sb.toString();
- }
- }
- }
-
- return messageText;
- }
-
- public class ExportQueueConsumer {
-
- private final String amqUser = ActiveMQConnection.DEFAULT_USER;
- private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
- private final int totalToExpect;
- private ActiveMQConnectionFactory connectionFactory = null;
- private String activemqURL = null;
- private String activemqQueues = null;
- private String[] queues = null;
- // Map of IDs that were consumed, keyed by queue name.
- // We'll compare these against what was published to know if any got stuck or dropped.
- private Map<String, List<String>> idsByQueue = new HashMap<>();
- private Map<String, List<ConsumerThread>> threads;
-
- public ExportQueueConsumer(String activemqURL,
- String activemqQueues,
- int threadsPerQueue,
- int batchSize,
- int totalToExpect) throws Exception {
-
- this.activemqURL = activemqURL;
- this.activemqQueues = activemqQueues;
- this.totalToExpect = totalToExpect;
-
- queues = this.activemqQueues.split(",");
-
- for (int i = 0; i < queues.length; i++) {
- queues[i] = queues[i].trim();
- }
-
- threads = new HashMap<>();
-
- // For each queue, create a list of threads and set up the list of ids
- for (String q : queues) {
-
- List<ConsumerThread> list = new ArrayList<>();
-
- idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>()));
-
- for (int i = 0; i < threadsPerQueue; i++) {
- list.add(new ConsumerThread(q, batchSize));
- }
-
- threads.put(q, list);
- }
- }
-
- public Map<String, List<String>> getIDs() {
- return idsByQueue;
- }
-
- // Start the threads
- public void start() throws Exception {
-
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- ct.start();
- }
- }
- }
-
- // Tell the threads to stop
- // Then wait for them to stop
- public void shutdown() throws Exception {
-
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- ct.shutdown();
- }
- }
-
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- ct.join();
- }
- }
- }
-
- private Session newSession(QueueConnection queueConnection) throws Exception {
- return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
- }
-
- private synchronized QueueConnection newQueueConnection() throws Exception {
-
- if (connectionFactory == null) {
- connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
- connectionFactory.setWatchTopicAdvisories(false);
- }
-
- // Set the redelivery count to -1 (infinite), or else messages will start dropping
- // after the queue has had a certain number of failures (default is 6)
- RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
- policy.setMaximumRedeliveries(-1);
-
- QueueConnection amqConnection = connectionFactory.createQueueConnection();
- amqConnection.start();
- return amqConnection;
- }
-
- public boolean completed() {
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- if (ct.isAlive()) {
- LOG.info("thread for {} is still alive.", ct.qName);
- return false;
- }
- }
- }
- return true;
- }
-
- private class ConsumerThread extends Thread {
-
- private int batchSize;
- private QueueConnection qc;
- private Session session;
- private MessageConsumer mc;
- private List<String> idList;
- private boolean shutdown = false;
- private String qName;
-
- private ConsumerThread(String queueName, int batchSize) throws Exception {
-
- this.batchSize = batchSize;
-
- // Each thread has its own connection and session
- qName = queueName;
- qc = newQueueConnection();
- session = newSession(qc);
- Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
- mc = session.createConsumer(q);
-
- idList = idsByQueue.get(queueName);
- }
-
- @Override
- public void run() {
-
- try {
-
- int count = 0;
-
- // Keep reading as long as it hasn't been told to shutdown
- while (!shutdown) {
-
- if (idList.size() >= totalToExpect) {
- LOG.info("Got {} for q: {}", +idList.size(), qName);
- session.commit();
- break;
- }
- Message m = mc.receive(4000);
-
- if (m != null) {
-
- // We received a non-null message, add the ID to our list
-
- idList.add(m.getStringProperty("KEY"));
-
- count++;
-
- // If we've reached our batch size, commit the batch and reset the count
-
- if (count == batchSize) {
- session.commit();
- count = 0;
- }
- }
- else {
-
- // We didn't receive anything this time, commit any current batch and reset the count
-
- session.commit();
- count = 0;
-
- // Sleep a little before trying to read after not getting a message
-
- try {
- if (idList.size() < totalToExpect) {
- LOG.info("did not receive on {}, current count: {}", qName, idList.size());
- didNotReceive.set(true);
- }
- //sleep(3000);
- }
- catch (Exception e) {
- }
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- finally {
-
- // Once we exit, close everything
- close();
- }
- }
-
- public void shutdown() {
- shutdown = true;
- }
-
- public void close() {
-
- try {
- mc.close();
- }
- catch (Exception e) {
- }
-
- try {
- session.close();
- }
- catch (Exception e) {
- }
-
- try {
- qc.close();
- }
- catch (Exception e) {
-
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
deleted file mode 100644
index c5712b8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
+++ /dev/null
@@ -1,604 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.UUID;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Stuck messages test client.
- * <br>
- * Will kick of publisher and consumer simultaneously, and will usually result in stuck messages on the queue.
- */
-@RunWith(Parameterized.class)
-public class AMQ5266Test {
-
- static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class);
- String activemqURL = "tcp://localhost:61617";
- BrokerService brokerService;
-
- public int messageSize = 1000;
-
- @Parameterized.Parameter(0)
- public int publisherMessagesPerThread = 1000;
-
- @Parameterized.Parameter(1)
- public int publisherThreadCount = 20;
-
- @Parameterized.Parameter(2)
- public int consumerThreadsPerQueue = 5;
-
- @Parameterized.Parameter(3)
- public int destMemoryLimit = 50 * 1024;
-
- @Parameterized.Parameter(4)
- public boolean useCache = true;
-
- @Parameterized.Parameter(5)
- public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
-
- @Parameterized.Parameter(6)
- public boolean optimizeDispatch = false;
-
- @Parameterized.Parameters(name = "#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
- public static Iterable<Object[]> parameters() {
- return Arrays.asList(new Object[][]{{1, 1, 1, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true}, {1000, 20, 5, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, {100, 20, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.JDBC, false}, {1000, 5, 20, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, {1000, 20, 20, 1024 * 1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false},
-
- {1, 1, 1, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, {100, 5, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 20, 5, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {100, 20, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 5, 20, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, {1000, 20, 20, 1024 * 1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
-
- {1, 1, 1, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, {100, 5, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 20, 5, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {100, 20, 5, 50 * 1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 5, 20, 50 * 1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, {1000, 20, 20, 1024 * 1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false},
-
- });
- }
-
- public int consumerBatchSize = 5;
-
- @Before
- public void startBroker() throws Exception {
- brokerService = new BrokerService();
- TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
- brokerService.setDeleteAllMessagesOnStartup(true);
- brokerService.setUseJmx(false);
-
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract!
- defaultEntry.setMaxAuditDepth(publisherThreadCount);
- defaultEntry.setEnableAudit(true);
- defaultEntry.setUseCache(useCache);
- defaultEntry.setMaxPageSize(1000);
- defaultEntry.setOptimizedDispatch(optimizeDispatch);
- defaultEntry.setMemoryLimit(destMemoryLimit);
- defaultEntry.setExpireMessagesPeriod(0);
- policyMap.setDefaultEntry(defaultEntry);
- brokerService.setDestinationPolicy(policyMap);
-
- brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
-
- TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
- brokerService.start();
- activemqURL = transportConnector.getPublishableConnectString();
- }
-
- @After
- public void stopBroker() throws Exception {
- if (brokerService != null) {
- brokerService.stop();
- }
- }
-
- @Test
- public void test() throws Exception {
-
- String activemqQueues = "activemq,activemq2";//,activemq3,activemq4,activemq5,activemq6,activemq7,activemq8,activemq9";
-
- int consumerWaitForConsumption = 5 * 60 * 1000;
-
- ExportQueuePublisher publisher = null;
- ExportQueueConsumer consumer = null;
-
- LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified.");
- LOG.info("\nBuilding Publisher...");
-
- publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
-
- LOG.info("Building Consumer...");
-
- consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
-
- LOG.info("Starting Publisher...");
-
- publisher.start();
-
- LOG.info("Starting Consumer...");
-
- consumer.start();
-
- int distinctPublishedCount = 0;
-
- LOG.info("Waiting For Publisher Completion...");
-
- publisher.waitForCompletion();
-
- List<String> publishedIds = publisher.getIDs();
- distinctPublishedCount = new TreeSet<>(publishedIds).size();
-
- LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
-
- long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
- while (!consumer.completed() && System.currentTimeMillis() < endWait) {
- try {
- int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
- LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
- Thread.sleep(10000);
- }
- catch (Exception e) {
- }
- }
-
- LOG.info("\nConsumer Complete: " + consumer.completed() + ", Shutting Down.");
-
- consumer.shutdown();
-
- LOG.info("Consumer Stats:");
-
- for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
-
- List<String> idList = entry.getValue();
-
- int distinctConsumed = new TreeSet<>(idList).size();
-
- StringBuilder sb = new StringBuilder();
- sb.append(" Queue: " + entry.getKey() +
- " -> Total Messages Consumed: " + idList.size() +
- ", Distinct IDs Consumed: " + distinctConsumed);
-
- int diff = distinctPublishedCount - distinctConsumed;
- sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) ");
- LOG.info(sb.toString());
-
- assertEquals("expect to get all messages!", 0, diff);
-
- }
- }
-
- public class ExportQueuePublisher {
-
- private final String amqUser = ActiveMQConnection.DEFAULT_USER;
- private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
- private ActiveMQConnectionFactory connectionFactory = null;
- private String activemqURL = null;
- private String activemqQueues = null;
- // Collection of distinct IDs that the publisher has published.
- // After a message is published, its UUID will be written to this list for tracking.
- // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs.
- //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>());
- private List<String> ids = Collections.synchronizedList(new ArrayList<String>());
- private List<PublisherThread> threads;
-
- public ExportQueuePublisher(String activemqURL,
- String activemqQueues,
- int messagesPerThread,
- int threadCount) throws Exception {
-
- this.activemqURL = activemqURL;
- this.activemqQueues = activemqQueues;
-
- threads = new ArrayList<>();
-
- // Build the threads and tell them how many messages to publish
- for (int i = 0; i < threadCount; i++) {
- PublisherThread pt = new PublisherThread(messagesPerThread);
- threads.add(pt);
- }
- }
-
- public List<String> getIDs() {
- return ids;
- }
-
- // Kick off threads
- public void start() throws Exception {
-
- for (PublisherThread pt : threads) {
- pt.start();
- }
- }
-
- // Wait for threads to complete. They will complete once they've published all of their messages.
- public void waitForCompletion() throws Exception {
-
- for (PublisherThread pt : threads) {
- pt.join();
- pt.close();
- }
- }
-
- private Session newSession(QueueConnection queueConnection) throws Exception {
- return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
- }
-
- private synchronized QueueConnection newQueueConnection() throws Exception {
-
- if (connectionFactory == null) {
- connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
- }
-
- // Set the redelivery count to -1 (infinite), or else messages will start dropping
- // after the queue has had a certain number of failures (default is 6)
- RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
- policy.setMaximumRedeliveries(-1);
-
- QueueConnection amqConnection = connectionFactory.createQueueConnection();
- amqConnection.start();
- return amqConnection;
- }
-
- private class PublisherThread extends Thread {
-
- private int count;
- private QueueConnection qc;
- private Session session;
- private MessageProducer mp;
-
- private PublisherThread(int count) throws Exception {
-
- this.count = count;
-
- // Each Thread has its own Connection and Session, so no sync worries
- qc = newQueueConnection();
- session = newSession(qc);
-
- // In our code, when publishing to multiple queues,
- // we're using composite destinations like below
- Queue q = new ActiveMQQueue(activemqQueues);
- mp = session.createProducer(q);
- }
-
- @Override
- public void run() {
-
- try {
-
- // Loop until we've published enough messages
- while (count-- > 0) {
-
- TextMessage tm = session.createTextMessage(getMessageText());
- String id = UUID.randomUUID().toString();
- tm.setStringProperty("KEY", id);
- ids.add(id); // keep track of the key to compare against consumer
-
- mp.send(tm);
- session.commit();
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- // Called by waitForCompletion
- public void close() {
-
- try {
- mp.close();
- }
- catch (Exception e) {
- }
-
- try {
- session.close();
- }
- catch (Exception e) {
- }
-
- try {
- qc.close();
- }
- catch (Exception e) {
- }
- }
- }
-
- }
-
- String messageText;
-
- private String getMessageText() {
-
- if (messageText == null) {
-
- synchronized (this) {
-
- if (messageText == null) {
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < messageSize; i++) {
- sb.append("X");
- }
- messageText = sb.toString();
- }
- }
- }
-
- return messageText;
- }
-
- public class ExportQueueConsumer {
-
- private final String amqUser = ActiveMQConnection.DEFAULT_USER;
- private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
- private final int totalToExpect;
- private ActiveMQConnectionFactory connectionFactory = null;
- private String activemqURL = null;
- private String activemqQueues = null;
- private String[] queues = null;
- // Map of IDs that were consumed, keyed by queue name.
- // We'll compare these against what was published to know if any got stuck or dropped.
- private Map<String, List<String>> idsByQueue = new HashMap<>();
- private Map<String, List<ConsumerThread>> threads;
-
- public ExportQueueConsumer(String activemqURL,
- String activemqQueues,
- int threadsPerQueue,
- int batchSize,
- int totalToExpect) throws Exception {
-
- this.activemqURL = activemqURL;
- this.activemqQueues = activemqQueues;
- this.totalToExpect = totalToExpect;
-
- queues = this.activemqQueues.split(",");
-
- for (int i = 0; i < queues.length; i++) {
- queues[i] = queues[i].trim();
- }
-
- threads = new HashMap<>();
-
- // For each queue, create a list of threads and set up the list of ids
- for (String q : queues) {
-
- List<ConsumerThread> list = new ArrayList<>();
-
- idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>()));
-
- for (int i = 0; i < threadsPerQueue; i++) {
- list.add(new ConsumerThread(q, batchSize));
- }
-
- threads.put(q, list);
- }
- }
-
- public Map<String, List<String>> getIDs() {
- return idsByQueue;
- }
-
- // Start the threads
- public void start() throws Exception {
-
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- ct.start();
- }
- }
- }
-
- // Tell the threads to stop
- // Then wait for them to stop
- public void shutdown() throws Exception {
-
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- ct.shutdown();
- }
- }
-
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- ct.join();
- }
- }
- }
-
- private Session newSession(QueueConnection queueConnection) throws Exception {
- return queueConnection.createSession(true, Session.SESSION_TRANSACTED);
- }
-
- private synchronized QueueConnection newQueueConnection() throws Exception {
-
- if (connectionFactory == null) {
- connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL);
- }
-
- // Set the redelivery count to -1 (infinite), or else messages will start dropping
- // after the queue has had a certain number of failures (default is 6)
- RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
- policy.setMaximumRedeliveries(-1);
-
- QueueConnection amqConnection = connectionFactory.createQueueConnection();
- amqConnection.start();
- return amqConnection;
- }
-
- public boolean completed() {
- for (List<ConsumerThread> list : threads.values()) {
-
- for (ConsumerThread ct : list) {
-
- if (ct.isAlive()) {
- LOG.info("thread for {} is still alive.", ct.qName);
- return false;
- }
- }
- }
- return true;
- }
-
- private class ConsumerThread extends Thread {
-
- private int batchSize;
- private QueueConnection qc;
- private Session session;
- private MessageConsumer mc;
- private List<String> idList;
- private boolean shutdown = false;
- private String qName;
-
- private ConsumerThread(String queueName, int batchSize) throws Exception {
-
- this.batchSize = batchSize;
-
- // Each thread has its own connection and session
- qName = queueName;
- qc = newQueueConnection();
- session = newSession(qc);
- Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize);
- mc = session.createConsumer(q);
-
- idList = idsByQueue.get(queueName);
- }
-
- @Override
- public void run() {
-
- try {
-
- int count = 0;
-
- // Keep reading as long as it hasn't been told to shutdown
- while (!shutdown) {
-
- if (idList.size() >= totalToExpect) {
- LOG.info("Got {} for q: {}", +idList.size(), qName);
- session.commit();
- break;
- }
- Message m = mc.receive(4000);
-
- if (m != null) {
-
- // We received a non-null message, add the ID to our list
-
- idList.add(m.getStringProperty("KEY"));
-
- count++;
-
- // If we've reached our batch size, commit the batch and reset the count
-
- if (count == batchSize) {
- session.commit();
- count = 0;
- }
- }
- else {
-
- // We didn't receive anything this time, commit any current batch and reset the count
-
- session.commit();
- count = 0;
-
- // Sleep a little before trying to read after not getting a message
-
- try {
- if (idList.size() < totalToExpect) {
- LOG.info("did not receive on {}, current count: {}", qName, idList.size());
- }
- //sleep(3000);
- }
- catch (Exception e) {
- }
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- finally {
-
- // Once we exit, close everything
- close();
- }
- }
-
- public void shutdown() {
- shutdown = true;
- }
-
- public void close() {
-
- try {
- mc.close();
- }
- catch (Exception e) {
- }
-
- try {
- session.close();
- }
- catch (Exception e) {
- }
-
- try {
- qc.close();
- }
- catch (Exception e) {
-
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
deleted file mode 100644
index d4c02fb..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class AMQ5274Test {
-
- static Logger LOG = LoggerFactory.getLogger(AMQ5274Test.class);
- String activemqURL;
- BrokerService brokerService;
- ActiveMQQueue dest = new ActiveMQQueue("TestQ");
-
- @Before
- public void startBroker() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultPolicy = new PolicyEntry();
- defaultPolicy.setExpireMessagesPeriod(1000);
- policyMap.setDefaultEntry(defaultPolicy);
- brokerService.setDestinationPolicy(policyMap);
- activemqURL = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
- brokerService.start();
- }
-
- @After
- public void stopBroker() throws Exception {
- if (brokerService != null) {
- brokerService.stop();
- }
- }
-
- @Test
- public void test() throws Exception {
- LOG.info("Starting Test");
- assertTrue(brokerService.isStarted());
-
- produce();
- consumeAndRollback();
-
- // check reported queue size using JMX
- long queueSize = getQueueSize();
- assertEquals("Queue " + dest.getPhysicalName() + " not empty, reporting " + queueSize + " messages.", 0, queueSize);
- }
-
- private void consumeAndRollback() throws JMSException, InterruptedException {
- ActiveMQConnection connection = createConnection();
- RedeliveryPolicy noRedelivery = new RedeliveryPolicy();
- noRedelivery.setMaximumRedeliveries(0);
- connection.setRedeliveryPolicy(noRedelivery);
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = session.createConsumer(dest);
- Message m;
- while ((m = consumer.receive(4000)) != null) {
- LOG.info("Got:" + m);
- TimeUnit.SECONDS.sleep(1);
- session.rollback();
- }
- connection.close();
- }
-
- private void produce() throws Exception {
- Connection connection = createConnection();
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(dest);
- producer.setTimeToLive(10000);
- for (int i = 0; i < 20; i++) {
- producer.send(session.createTextMessage("i=" + i));
- }
- connection.close();
- }
-
- private ActiveMQConnection createConnection() throws JMSException {
- return (ActiveMQConnection) new ActiveMQConnectionFactory(activemqURL).createConnection();
- }
-
- public long getQueueSize() throws Exception {
- long queueSize = 0;
- try {
- QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createDestinationName(brokerService.getBrokerObjectName(), dest), QueueViewMBean.class, false);
- queueSize = queueViewMBean.getQueueSize();
- LOG.info("QueueSize for destination {} is {}", dest, queueSize);
- }
- catch (Exception ex) {
- LOG.error("Error retrieving QueueSize from JMX ", ex);
- throw ex;
- }
- return queueSize;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
deleted file mode 100644
index a05d56d..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Arrays;
-import java.util.Random;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-public class AMQ5381Test {
-
- public static final byte[] ORIG_MSG_CONTENT = randomByteArray();
- public static final String AMQ5381_EXCEPTION_MESSAGE = "java.util.zip.DataFormatException: incorrect header check";
-
- private BrokerService brokerService;
- private String brokerURI;
-
- @Rule
- public TestName name = new TestName();
-
- @Before
- public void startBroker() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.setUseJmx(false);
- brokerService.addConnector("tcp://localhost:0");
- brokerService.start();
- brokerService.waitUntilStarted();
-
- brokerURI = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
- }
-
- @After
- public void stopBroker() throws Exception {
- if (brokerService != null) {
- brokerService.stop();
- }
- }
-
- private ActiveMQConnection createConnection(boolean useCompression) throws Exception {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
- factory.setUseCompression(useCompression);
- Connection connection = factory.createConnection();
- connection.start();
- return (ActiveMQConnection) connection;
- }
-
- @Test
- public void amq5381Test() throws Exception {
-
- // Consumer Configured for (useCompression=true)
- final ActiveMQConnection consumerConnection = createConnection(true);
- final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue consumerQueue = consumerSession.createQueue(name.getMethodName());
- final MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
-
- // Producer Configured for (useCompression=false)
- final ActiveMQConnection producerConnection = createConnection(false);
- final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue producerQueue = producerSession.createQueue(name.getMethodName());
-
- try {
-
- final ActiveMQBytesMessage messageProduced = (ActiveMQBytesMessage) producerSession.createBytesMessage();
- messageProduced.writeBytes(ORIG_MSG_CONTENT);
- Assert.assertFalse(messageProduced.isReadOnlyBody());
-
- Assert.assertFalse("Produced Message's 'compressed' flag should remain false until the message is sent (where it will be compressed, if necessary)", messageProduced.isCompressed());
-
- final MessageProducer producer = producerSession.createProducer(null);
- producer.send(producerQueue, messageProduced);
-
- Assert.assertEquals("Once sent, the produced Message's 'compressed' flag should match its Connection's 'useCompression' flag", producerConnection.isUseCompression(), messageProduced.isCompressed());
-
- final ActiveMQBytesMessage messageConsumed = (ActiveMQBytesMessage) consumer.receive();
- Assert.assertNotNull(messageConsumed);
- Assert.assertTrue("Consumed Message should be read-only", messageConsumed.isReadOnlyBody());
- Assert.assertEquals("Consumed Message's 'compressed' flag should match the produced Message's 'compressed' flag", messageProduced.isCompressed(), messageConsumed.isCompressed());
-
- // ensure consumed message content matches what was originally set
- final byte[] consumedMsgContent = new byte[(int) messageConsumed.getBodyLength()];
- messageConsumed.readBytes(consumedMsgContent);
-
- Assert.assertTrue("Consumed Message content should match the original Message content", Arrays.equals(ORIG_MSG_CONTENT, consumedMsgContent));
-
- // make message writable so the consumer can modify and reuse it
- makeWritable(messageConsumed);
-
- // modify message, attempt to trigger DataFormatException due
- // to old incorrect compression logic
- try {
- messageConsumed.setStringProperty(this.getClass().getName(), "test");
- }
- catch (JMSException jmsE) {
- if (AMQ5381_EXCEPTION_MESSAGE.equals(jmsE.getMessage())) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- jmsE.printStackTrace(pw);
-
- Assert.fail("AMQ5381 Error State Achieved: attempted to decompress BytesMessage contents that are not compressed\n" + sw.toString());
- }
- else {
- throw jmsE;
- }
- }
-
- Assert.assertEquals("The consumed Message's 'compressed' flag should still match the produced Message's 'compressed' flag after it has been made writable", messageProduced.isCompressed(), messageConsumed.isCompressed());
-
- // simulate re-publishing message
- simulatePublish(messageConsumed);
-
- // ensure consumed message content matches what was originally set
- final byte[] modifiedMsgContent = new byte[(int) messageConsumed.getBodyLength()];
- messageConsumed.readBytes(modifiedMsgContent);
-
- Assert.assertTrue("After the message properties are modified and it is re-published, its message content should still match the original message content", Arrays.equals(ORIG_MSG_CONTENT, modifiedMsgContent));
- }
- finally {
- producerSession.close();
- producerConnection.close();
- consumerSession.close();
- consumerConnection.close();
- }
- }
-
- protected static final int MAX_RANDOM_BYTE_ARRAY_SIZE_KB = 128;
-
- protected static byte[] randomByteArray() {
- final Random random = new Random();
- final byte[] byteArray = new byte[random.nextInt(MAX_RANDOM_BYTE_ARRAY_SIZE_KB * 1024)];
- random.nextBytes(byteArray);
-
- return byteArray;
- }
-
- protected static void makeWritable(final ActiveMQMessage message) {
- message.setReadOnlyBody(false);
- message.setReadOnlyProperties(false);
- }
-
- protected static void simulatePublish(final ActiveMQBytesMessage message) throws JMSException {
- message.reset();
- message.onSend();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
deleted file mode 100644
index 0e9e310..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ5421Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ5421Test.class);
-
- private static final int DEST_COUNT = 1000;
- private final Destination[] destination = new Destination[DEST_COUNT];
- private final MessageProducer[] producer = new MessageProducer[DEST_COUNT];
- private BrokerService brokerService;
- private String connectionUri;
-
- protected ConnectionFactory createConnectionFactory() throws Exception {
- ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(connectionUri);
- conFactory.setWatchTopicAdvisories(false);
- return conFactory;
- }
-
- protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() {
- AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
- strategy.setCheckPeriod(2000);
- strategy.setMaxTimeSinceLastAck(5000);
- strategy.setIgnoreIdleConsumers(false);
-
- return strategy;
- }
-
- @Before
- public void setUp() throws Exception {
- brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true"));
- PolicyEntry policy = new PolicyEntry();
-
- policy.setSlowConsumerStrategy(createSlowConsumerStrategy());
- policy.setQueuePrefetch(10);
- policy.setTopicPrefetch(10);
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
- brokerService.setDestinationPolicy(pMap);
- brokerService.addConnector("tcp://0.0.0.0:0");
- brokerService.start();
-
- connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
- }
-
- @Test
- public void testManyTempDestinations() throws Exception {
- Connection connection = createConnectionFactory().createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- for (int i = 0; i < DEST_COUNT; i++) {
- destination[i] = session.createTemporaryQueue();
- LOG.debug("Created temp queue: [}", i);
- }
-
- for (int i = 0; i < DEST_COUNT; i++) {
- producer[i] = session.createProducer(destination[i]);
- LOG.debug("Created producer: {}", i);
- TextMessage msg = session.createTextMessage(" testMessage " + i);
- producer[i].send(msg);
- LOG.debug("message sent: {}", i);
- MessageConsumer consumer = session.createConsumer(destination[i]);
- Message message = consumer.receive(1000);
- Assert.assertTrue(message.equals(msg));
- }
-
- for (int i = 0; i < DEST_COUNT; i++) {
- producer[i].close();
- }
-
- connection.close();
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-}