You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/16 16:21:54 UTC
[25/61] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java
deleted file mode 100644
index ad12f71..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.leveldb.LevelDBStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a test case for the issue reported at:
- * https://issues.apache.org/activemq/browse/AMQ-1866
- *
- * If you have a JMS producer sending messages to multiple fast consumers and
- * one slow consumer, eventually all consumers will run as slow as
- * the slowest consumer.
- */
-public class AMQ1866 extends TestCase {
-
- private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class);
- private BrokerService brokerService;
- private ArrayList<Thread> threads = new ArrayList<>();
-
- private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
- private String ACTIVEMQ_BROKER_URI;
-
- AtomicBoolean shutdown = new AtomicBoolean();
- private ActiveMQQueue destination;
-
- @Override
- protected void setUp() throws Exception {
- // Start an embedded broker up.
- brokerService = new BrokerService();
- LevelDBStore adaptor = new LevelDBStore();
- brokerService.setPersistenceAdapter(adaptor);
- brokerService.deleteAllMessages();
-
- // A small max page size makes this issue occur faster.
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry pe = new PolicyEntry();
- pe.setMaxPageSize(1);
- policyMap.put(new ActiveMQQueue(">"), pe);
- brokerService.setDestinationPolicy(policyMap);
-
- brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
- brokerService.start();
-
- ACTIVEMQ_BROKER_URI = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
- destination = new ActiveMQQueue(getName());
- }
-
- @Override
- protected void tearDown() throws Exception {
- // Stop any running threads.
- shutdown.set(true);
- for (Thread t : threads) {
- t.interrupt();
- t.join();
- }
- brokerService.stop();
- }
-
- public void testConsumerSlowDownPrefetch0() throws Exception {
- ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=0";
- doTestConsumerSlowDown();
- }
-
- public void testConsumerSlowDownPrefetch10() throws Exception {
- ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=10";
- doTestConsumerSlowDown();
- }
-
- public void testConsumerSlowDownDefaultPrefetch() throws Exception {
- doTestConsumerSlowDown();
- }
-
- public void doTestConsumerSlowDown() throws Exception {
-
- // Preload the queue.
- produce(20000);
-
- Thread producer = new Thread() {
- @Override
- public void run() {
- try {
- while (!shutdown.get()) {
- produce(1000);
- }
- }
- catch (Exception e) {
- }
- }
- };
- threads.add(producer);
- producer.start();
-
- // This is the slow consumer.
- ConsumerThread c1 = new ConsumerThread("Consumer-1");
- threads.add(c1);
- c1.start();
-
- // Wait a bit so that the slow consumer gets assigned most of the messages.
- Thread.sleep(500);
- ConsumerThread c2 = new ConsumerThread("Consumer-2");
- threads.add(c2);
- c2.start();
-
- int totalReceived = 0;
- for (int i = 0; i < 30; i++) {
- Thread.sleep(1000);
- long c1Counter = c1.counter.getAndSet(0);
- long c2Counter = c2.counter.getAndSet(0);
- log.debug("c1: " + c1Counter + ", c2: " + c2Counter);
- totalReceived += c1Counter;
- totalReceived += c2Counter;
-
- // Once message have been flowing for a few seconds, start asserting that c2 always gets messages. It should be receiving about 100 / sec
- if (i > 10) {
- assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0);
- }
- }
- }
-
- public void produce(int count) throws Exception {
- Connection connection = null;
- try {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
- factory.setDispatchAsync(true);
-
- connection = factory.createConnection();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- connection.start();
-
- for (int i = 0; i < count; i++) {
- producer.send(session.createTextMessage(getName() + " Message " + (++i)));
- }
-
- }
- finally {
- try {
- connection.close();
- }
- catch (Throwable e) {
- }
- }
- }
-
- public class ConsumerThread extends Thread {
-
- final AtomicLong counter = new AtomicLong();
-
- public ConsumerThread(String threadId) {
- super(threadId);
- }
-
- @Override
- public void run() {
- Connection connection = null;
- try {
- log.debug(getName() + ": is running");
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
- factory.setDispatchAsync(true);
-
- connection = factory.createConnection();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(destination);
- connection.start();
-
- while (!shutdown.get()) {
- TextMessage msg = (TextMessage) consumer.receive(1000);
- if (msg != null) {
- int sleepingTime;
- if (getName().equals("Consumer-1")) {
- sleepingTime = 1000 * 1000;
- }
- else {
- sleepingTime = 1;
- }
- counter.incrementAndGet();
- Thread.sleep(sleepingTime);
- }
- }
-
- }
- catch (Exception e) {
- }
- finally {
- log.debug(getName() + ": is stopping");
- try {
- connection.close();
- }
- catch (Throwable e) {
- }
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
deleted file mode 100644
index b9cb919..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class AMQ1893Test extends TestCase {
-
- private static final Logger log = LoggerFactory.getLogger(AMQ1893Test.class);
-
- static final String QUEUE_NAME = "TEST";
-
- static final int MESSAGE_COUNT_OF_ONE_GROUP = 10000;
-
- static final int[] PRIORITIES = new int[]{0, 5, 10};
-
- static final boolean debug = false;
-
- private BrokerService brokerService;
-
- private ActiveMQQueue destination;
-
- @Override
- protected void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setDeleteAllMessagesOnStartup(true);
- brokerService.addConnector("tcp://localhost:0");
- brokerService.start();
- destination = new ActiveMQQueue(QUEUE_NAME);
- }
-
- @Override
- protected void tearDown() throws Exception {
- // Stop any running threads.
- brokerService.stop();
- }
-
- public void testProduceConsumeWithSelector() throws Exception {
- new TestProducer().produceMessages();
- new TestConsumer().consume();
- }
-
- class TestProducer {
-
- public void produceMessages() throws Exception {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString());
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue(QUEUE_NAME);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- long start = System.currentTimeMillis();
-
- for (int priority : PRIORITIES) {
-
- String name = null;
- if (priority == 10) {
- name = "high";
- }
- else if (priority == 5) {
- name = "mid";
- }
- else {
- name = "low";
- }
-
- for (int i = 1; i <= MESSAGE_COUNT_OF_ONE_GROUP; i++) {
-
- TextMessage message = session.createTextMessage(name + "_" + i);
- message.setIntProperty("priority", priority);
-
- producer.send(message);
- }
- }
-
- long end = System.currentTimeMillis();
-
- log.info("sent " + (MESSAGE_COUNT_OF_ONE_GROUP * 3) + " messages in " + (end - start) + " ms");
-
- producer.close();
- session.close();
- connection.close();
- }
- }
-
- class TestConsumer {
-
- private CountDownLatch finishLatch = new CountDownLatch(1);
-
- public void consume() throws Exception {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString());
-
- final int totalMessageCount = MESSAGE_COUNT_OF_ONE_GROUP * PRIORITIES.length;
- final AtomicInteger counter = new AtomicInteger();
- final MessageListener listener = new MessageListener() {
- @Override
- public void onMessage(Message message) {
-
- if (debug) {
- try {
- log.info(((TextMessage) message).getText());
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
-
- if (counter.incrementAndGet() == totalMessageCount) {
-
- finishLatch.countDown();
-
- }
- }
- };
-
- int consumerCount = PRIORITIES.length;
- Connection[] connections = new Connection[consumerCount];
- Session[] sessions = new Session[consumerCount];
- MessageConsumer[] consumers = new MessageConsumer[consumerCount];
-
- for (int i = 0; i < consumerCount; i++) {
- String selector = "priority = " + PRIORITIES[i];
-
- connections[i] = connectionFactory.createConnection();
- sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- consumers[i] = sessions[i].createConsumer(destination, selector);
- consumers[i].setMessageListener(listener);
- }
-
- for (Connection connection : connections) {
- connection.start();
- }
-
- log.info("received " + counter.get() + " messages");
-
- assertTrue("got all messages in time", finishLatch.await(60, TimeUnit.SECONDS));
-
- log.info("received " + counter.get() + " messages");
-
- for (MessageConsumer consumer : consumers) {
- consumer.close();
- }
-
- for (Session session : sessions) {
- session.close();
- }
-
- for (Connection connection : connections) {
- connection.close();
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
deleted file mode 100644
index a7eb699..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import junit.framework.TestCase;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-
-public class AMQ1917Test extends TestCase {
-
- private static final int NUM_MESSAGES = 4000;
- private static final int NUM_THREADS = 10;
- private static final String REQUEST_QUEUE = "mock.in.queue";
- private static final String REPLY_QUEUE = "mock.out.queue";
-
- private Destination requestDestination = ActiveMQDestination.createDestination(REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE);
- private Destination replyDestination = ActiveMQDestination.createDestination(REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE);
-
- private CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
- private CountDownLatch errorLatch = new CountDownLatch(1);
- private ThreadPoolExecutor tpe;
- private final String BROKER_URL = "tcp://localhost:0";
- private String connectionUri;
- private BrokerService broker = null;
- private boolean working = true;
-
- // trival session/producer pool
- final Session[] sessions = new Session[NUM_THREADS];
- final MessageProducer[] producers = new MessageProducer[NUM_THREADS];
-
- @Override
- public void setUp() throws Exception {
- broker = new BrokerService();
- broker.setPersistent(false);
- broker.addConnector(BROKER_URL);
- broker.start();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-
- BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10000);
- tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000, TimeUnit.MILLISECONDS, queue);
- ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory());
- tpe.setThreadFactory(limitedthreadFactory);
- }
-
- @Override
- public void tearDown() throws Exception {
- broker.stop();
- tpe.shutdown();
- }
-
- public void testLoadedSendReceiveWithCorrelationId() throws Exception {
-
- ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
- connectionFactory.setBrokerURL(connectionUri);
- Connection connection = connectionFactory.createConnection();
- setupReceiver(connection);
-
- connection = connectionFactory.createConnection();
- connection.start();
-
- // trival session/producer pool
- for (int i = 0; i < NUM_THREADS; i++) {
- sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producers[i] = sessions[i].createProducer(requestDestination);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++) {
- MessageSenderReceiver msr = new MessageSenderReceiver(requestDestination, replyDestination, "Test Message : " + i);
- tpe.execute(msr);
- }
-
- while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) {
- if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) {
- fail("there was an error, check the console for thread or thread allocation failure");
- break;
- }
- }
- working = false;
- }
-
- private void setupReceiver(final Connection connection) throws Exception {
-
- final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = session.createConsumer(requestDestination);
- final MessageProducer sender = session.createProducer(replyDestination);
- connection.start();
-
- new Thread() {
- @Override
- public void run() {
- while (working) {
- // wait for messages in infinitive loop
- // time out is set to show the client is awaiting
- try {
- TextMessage msg = (TextMessage) consumer.receive(20000);
- if (msg == null) {
- errorLatch.countDown();
- fail("Response timed out." + " latchCount=" + roundTripLatch.getCount());
- }
- else {
- String result = msg.getText();
- //System.out.println("Request:" + (i++)
- // + ", msg=" + result + ", ID" + msg.getJMSMessageID());
- TextMessage response = session.createTextMessage();
- response.setJMSCorrelationID(msg.getJMSMessageID());
- response.setText(result);
- sender.send(response);
- }
- }
- catch (JMSException e) {
- if (working) {
- errorLatch.countDown();
- fail("Unexpected exception:" + e);
- }
- }
- }
- }
- }.start();
- }
-
- class MessageSenderReceiver implements Runnable {
-
- Destination reqDest;
- Destination replyDest;
- String origMsg;
-
- public MessageSenderReceiver(Destination reqDest, Destination replyDest, String msg) throws Exception {
- this.replyDest = replyDest;
- this.reqDest = reqDest;
- this.origMsg = msg;
- }
-
- private int getIndexFromCurrentThread() {
- String name = Thread.currentThread().getName();
- String num = name.substring(name.lastIndexOf('-') + 1);
- int idx = Integer.parseInt(num) - 1;
- assertTrue("idx is in range: idx=" + idx, idx < NUM_THREADS);
- return idx;
- }
-
- @Override
- public void run() {
- try {
- // get thread session and producer from pool
- int threadIndex = getIndexFromCurrentThread();
- Session session = sessions[threadIndex];
- MessageProducer producer = producers[threadIndex];
-
- final Message sendJmsMsg = session.createTextMessage(origMsg);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(sendJmsMsg);
-
- String jmsId = sendJmsMsg.getJMSMessageID();
- String selector = "JMSCorrelationID='" + jmsId + "'";
-
- MessageConsumer consumer = session.createConsumer(replyDest, selector);
- Message receiveJmsMsg = consumer.receive(2000);
- consumer.close();
- if (receiveJmsMsg == null) {
- errorLatch.countDown();
- fail("Unable to receive response for:" + origMsg + ", with selector=" + selector);
- }
- else {
- //System.out.println("received response message :"
- // + ((TextMessage) receiveJmsMsg).getText()
- // + " with selector : " + selector);
- roundTripLatch.countDown();
- }
- }
- catch (JMSException e) {
- fail("unexpected exception:" + e);
- }
- }
- }
-
- public class LimitedThreadFactory implements ThreadFactory {
-
- int threadCount;
- private ThreadFactory factory;
-
- public LimitedThreadFactory(ThreadFactory threadFactory) {
- this.factory = threadFactory;
- }
-
- @Override
- public Thread newThread(Runnable arg0) {
- if (++threadCount > NUM_THREADS) {
- errorLatch.countDown();
- fail("too many threads requested");
- }
- return factory.newThread(arg0);
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
deleted file mode 100644
index 6e49550..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.NamingException;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.Wait;
-import org.apache.log4j.Logger;
-
-/**
- * AMQ1936Test
- */
-public class AMQ1936Test extends TestCase {
-
- private final static Logger logger = Logger.getLogger(AMQ1936Test.class);
- private final static String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue";
- // //--
- //
- private final static long TEST_MESSAGE_COUNT = 6000; // The number of test messages to use
- //
- // //--
- private final static int CONSUMER_COUNT = 2; // The number of message receiver instances
- private final static boolean TRANSACTED_RECEIVE = true; // Flag used by receiver which indicates messages should be
- // processed within a JMS transaction
-
- private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CONSUMER_COUNT, CONSUMER_COUNT, Long.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- private final ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[CONSUMER_COUNT];
- private BrokerService broker = null;
- static QueueConnectionFactory connectionFactory = null;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
-
- broker = new BrokerService();
- broker.getSystemUsage().getMemoryUsage().setLimit(5 * 1024 * 1024);
- broker.setBrokerName("test");
- broker.setDeleteAllMessagesOnStartup(true);
- broker.start();
- connectionFactory = new ActiveMQConnectionFactory("vm://test");
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
-
- if (threadPool != null) {
- // signal receivers to stop
- for (ThreadedMessageReceiver receiver : receivers) {
- receiver.setShouldStop(true);
- }
-
- logger.info("Waiting for receivers to shutdown..");
- if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
- logger.warn("Not all receivers completed shutdown.");
- }
- else {
- logger.info("All receivers shutdown successfully..");
- }
- }
-
- logger.debug("Stoping the broker.");
-
- if (broker != null) {
- broker.stop();
- }
- }
-
- private void sendTextMessage(String queueName, int i) throws JMSException, NamingException {
- QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test");
- QueueConnection queueConnection = null;
- QueueSession session = null;
- QueueSender sender = null;
- Queue queue = null;
- TextMessage message = null;
-
- try {
-
- // Create the queue connection
- queueConnection = connectionFactory.createQueueConnection();
-
- session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(TEST_QUEUE_NAME);
- sender = session.createSender(queue);
- sender.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- message = session.createTextMessage(String.valueOf(i));
-
- // send the message
- sender.send(message);
-
- if (session.getTransacted()) {
- session.commit();
- }
- if (i % 1000 == 0) {
- logger.info("Message successfully sent to : " + queue.getQueueName() + " messageid: " + message.getJMSMessageID() + " content:" + message.getText());
- }
- }
- finally {
- if (sender != null) {
- sender.close();
- }
- if (session != null) {
- session.close();
- }
- if (queueConnection != null) {
- queueConnection.close();
- }
- }
- }
-
- public void testForDuplicateMessages() throws Exception {
- final ConcurrentHashMap<String, String> messages = new ConcurrentHashMap<>();
- final Object lock = new Object();
- final CountDownLatch duplicateSignal = new CountDownLatch(1);
- final AtomicInteger messageCount = new AtomicInteger(0);
-
- // add 1/2 the number of our total messages
- for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) {
- if (duplicateSignal.getCount() == 0) {
- fail("Duplicate message id detected");
- }
- sendTextMessage(TEST_QUEUE_NAME, i);
- }
-
- // create a number of consumers to read of the messages and start them with a handler which simply stores the
- // message ids
- // in a Map and checks for a duplicate
- for (int i = 0; i < CONSUMER_COUNT; i++) {
- receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler() {
-
- @Override
- public void onMessage(Message message) throws Exception {
- synchronized (lock) {
- int current = messageCount.incrementAndGet();
- if (current % 1000 == 0) {
- logger.info("Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage) message).getText());
- }
- if (messages.containsKey(message.getJMSMessageID())) {
- duplicateSignal.countDown();
- logger.fatal("duplicate message id detected:" + message.getJMSMessageID());
- fail("Duplicate message id detected:" + message.getJMSMessageID());
- }
- else {
- messages.put(message.getJMSMessageID(), message.getJMSMessageID());
- }
- }
- }
- });
- threadPool.submit(receivers[i]);
- }
-
- // starting adding the remaining messages
- for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) {
- if (duplicateSignal.getCount() == 0) {
- fail("Duplicate message id detected");
- }
- sendTextMessage(TEST_QUEUE_NAME, i);
- }
-
- logger.info("sent all " + TEST_MESSAGE_COUNT + " messages");
-
- // allow some time for messages to be delivered to receivers.
- boolean ok = Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return TEST_MESSAGE_COUNT == messages.size();
- }
- }, TimeUnit.MINUTES.toMillis(7));
- if (!ok) {
- AutoFailTestSupport.dumpAllThreads("--STUCK?--");
- }
- assertEquals("Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size());
- assertEquals(TEST_MESSAGE_COUNT, messageCount.get());
- }
-
- private final static class ThreadedMessageReceiver implements Runnable {
-
- private IMessageHandler handler = null;
- private final AtomicBoolean shouldStop = new AtomicBoolean(false);
-
- public ThreadedMessageReceiver(String queueName, IMessageHandler handler) {
- this.handler = handler;
- }
-
- @Override
- public void run() {
-
- QueueConnection queueConnection = null;
- QueueSession session = null;
- QueueReceiver receiver = null;
- Queue queue = null;
- Message message = null;
- try {
- try {
-
- queueConnection = connectionFactory.createQueueConnection();
- // create a transacted session
- session = queueConnection.createQueueSession(TRANSACTED_RECEIVE, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(TEST_QUEUE_NAME);
- receiver = session.createReceiver(queue);
-
- // start the connection
- queueConnection.start();
-
- logger.info("Receiver " + Thread.currentThread().getName() + " connected.");
-
- // start receive loop
- while (!(shouldStop.get() || Thread.currentThread().isInterrupted())) {
- try {
- message = receiver.receive(200);
- }
- catch (Exception e) {
- //
- // ignore interrupted exceptions
- //
- if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException) {
- /* ignore */
- }
- else {
- throw e;
- }
- }
-
- if (message != null && this.handler != null) {
- this.handler.onMessage(message);
- }
-
- // commit session on successful handling of message
- if (session.getTransacted()) {
- session.commit();
- }
- }
-
- logger.info("Receiver " + Thread.currentThread().getName() + " shutting down.");
-
- }
- finally {
- if (receiver != null) {
- try {
- receiver.close();
- }
- catch (JMSException e) {
- logger.warn(e);
- }
- }
- if (session != null) {
- try {
- session.close();
- }
- catch (JMSException e) {
- logger.warn(e);
- }
- }
- if (queueConnection != null) {
- queueConnection.close();
- }
- }
- }
- catch (JMSException e) {
- logger.error(e);
- e.printStackTrace();
- }
- catch (NamingException e) {
- logger.error(e);
- }
- catch (Exception e) {
- logger.error(e);
- e.printStackTrace();
- }
- }
-
- public void setShouldStop(Boolean shouldStop) {
- this.shouldStop.set(shouldStop);
- }
- }
-
- public interface IMessageHandler {
-
- void onMessage(Message message) throws Exception;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
deleted file mode 100644
index 7236581..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a test case for the issue reported at: https://issues.apache.org/activemq/browse/AMQ-2021 Bug is modification
- * of inflight message properties so the failure can manifest itself in a bunch or ways, from message receipt with null
- * properties to marshall errors
- */
-public class AMQ2021Test implements ExceptionListener, UncaughtExceptionHandler {
-
- private static final Logger log = LoggerFactory.getLogger(AMQ2021Test.class);
- BrokerService brokerService;
- ArrayList<Thread> threads = new ArrayList<>();
- Vector<Throwable> exceptions;
-
- @Rule
- public TestName name = new TestName();
-
- AMQ2021Test testCase;
-
- private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
- private String CONSUMER_BROKER_URL = "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
- private String PRODUCER_BROKER_URL;
-
- private final int numMessages = 1000;
- private final int numConsumers = 2;
- private final int dlqMessages = numMessages / 2;
-
- private CountDownLatch receivedLatch;
- private ActiveMQTopic destination;
- private CountDownLatch started;
-
- @Before
- public void setUp() throws Exception {
- Thread.setDefaultUncaughtExceptionHandler(this);
- testCase = this;
-
- // Start an embedded broker up.
- brokerService = new BrokerService();
- brokerService.setDeleteAllMessagesOnStartup(true);
- brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
- brokerService.start();
- destination = new ActiveMQTopic(name.getMethodName());
- exceptions = new Vector<>();
-
- CONSUMER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString() + CONSUMER_BROKER_URL;
- PRODUCER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
-
- receivedLatch = new CountDownLatch(numConsumers * (numMessages + dlqMessages));
- started = new CountDownLatch(1);
- }
-
- @After
- public void tearDown() throws Exception {
- for (Thread t : threads) {
- t.interrupt();
- t.join();
- }
- brokerService.stop();
- }
-
- @Test(timeout = 240000)
- public void testConcurrentTopicResendToDLQ() throws Exception {
-
- for (int i = 0; i < numConsumers; i++) {
- ConsumerThread c1 = new ConsumerThread("Consumer-" + i);
- threads.add(c1);
- c1.start();
- }
-
- assertTrue(started.await(10, TimeUnit.SECONDS));
-
- Thread producer = new Thread() {
- @Override
- public void run() {
- try {
- produce(numMessages);
- }
- catch (Exception e) {
- }
- }
- };
- threads.add(producer);
- producer.start();
-
- boolean allGood = receivedLatch.await(90, TimeUnit.SECONDS);
- for (Throwable t : exceptions) {
- log.error("failing test with first exception", t);
- fail("exception during test : " + t);
- }
- assertTrue("excepted messages received within time limit", allGood);
-
- assertEquals(0, exceptions.size());
-
- for (int i = 0; i < numConsumers; i++) {
- // last recovery sends message to deq so is not received again
- assertEquals(dlqMessages * 2, ((ConsumerThread) threads.get(i)).recoveries);
- assertEquals(numMessages + dlqMessages, ((ConsumerThread) threads.get(i)).counter);
- }
-
- // half of the messages for each consumer should go to the dlq but duplicates will
- // be suppressed
- consumeFromDLQ(dlqMessages);
-
- }
-
- private void consumeFromDLQ(int messageCount) throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
- Connection connection = connectionFactory.createConnection();
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
- int count = 0;
- for (int i = 0; i < messageCount; i++) {
- if (dlqConsumer.receive(1000) == null) {
- break;
- }
- count++;
- }
- assertEquals(messageCount, count);
- }
-
- public void produce(int count) throws Exception {
- Connection connection = null;
- try {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(PRODUCER_BROKER_URL);
- connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setTimeToLive(0);
- connection.start();
-
- for (int i = 0; i < count; i++) {
- int id = i + 1;
- TextMessage message = session.createTextMessage(name.getMethodName() + " Message " + id);
- message.setIntProperty("MsgNumber", id);
- producer.send(message);
-
- if (id % 500 == 0) {
- log.info("sent " + id + ", ith " + message);
- }
- }
- }
- catch (JMSException e) {
- log.error("unexpected ex on produce", e);
- exceptions.add(e);
- }
- finally {
- try {
- if (connection != null) {
- connection.close();
- }
- }
- catch (Throwable e) {
- }
- }
- }
-
- public class ConsumerThread extends Thread implements MessageListener {
-
- public long counter = 0;
- public long recoveries = 0;
- private Session session;
-
- public ConsumerThread(String threadId) {
- super(threadId);
- }
-
- @Override
- public void run() {
- try {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL);
- Connection connection = connectionFactory.createConnection();
- connection.setExceptionListener(testCase);
- connection.setClientID(getName());
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = session.createDurableSubscriber(destination, getName());
- consumer.setMessageListener(this);
- connection.start();
-
- started.countDown();
-
- }
- catch (JMSException exception) {
- log.error("unexpected ex in consumer run", exception);
- exceptions.add(exception);
- }
- }
-
- @Override
- public void onMessage(Message message) {
- try {
- counter++;
- int messageNumber = message.getIntProperty("MsgNumber");
- if (messageNumber % 2 == 0) {
- session.recover();
- recoveries++;
- }
- else {
- message.acknowledge();
- }
-
- if (counter % 200 == 0) {
- log.info("recoveries:" + recoveries + ", Received " + counter + ", counter'th " + message);
- }
- receivedLatch.countDown();
- }
- catch (Exception e) {
- log.error("unexpected ex on onMessage", e);
- exceptions.add(e);
- }
- }
-
- }
-
- @Override
- public void onException(JMSException exception) {
- log.info("Unexpected JMSException", exception);
- exceptions.add(exception);
- }
-
- @Override
- public void uncaughtException(Thread thread, Throwable exception) {
- log.info("Unexpected exception from thread " + thread + ", ex: " + exception);
- exceptions.add(exception);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java
deleted file mode 100644
index de9f2b5..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.naming.InitialContext;
-
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ2084Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2084Test.class);
- BrokerService broker;
- CountDownLatch qreceived;
- String connectionUri;
-
- @Before
- public void startBroker() throws Exception {
- broker = new BrokerService();
- broker.setPersistent(false);
- connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
- broker.start();
-
- qreceived = new CountDownLatch(1);
- }
-
- @After
- public void stopBroker() throws Exception {
- if (broker != null) {
- broker.stop();
- }
- }
-
- public void listenQueue(final String queueName, final String selectors) {
- try {
- Properties props = new Properties();
- props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- props.put("java.naming.provider.url", connectionUri);
- props.put("queue.queueName", queueName);
-
- javax.naming.Context ctx = new InitialContext(props);
- QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
- QueueConnection conn = factory.createQueueConnection();
- final Queue queue = (Queue) ctx.lookup("queueName");
- QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- QueueReceiver receiver = session.createReceiver(queue, selectors);
- System.out.println("Message Selector: " + receiver.getMessageSelector());
- receiver.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- if (message instanceof TextMessage) {
- TextMessage txtMsg = (TextMessage) message;
- String msg = txtMsg.getText();
- LOG.info("Queue Message Received: " + queueName + " - " + msg);
- qreceived.countDown();
-
- }
- message.acknowledge();
- }
- catch (Throwable e) {
- e.printStackTrace();
- }
- }
- });
- conn.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public void listenTopic(final String topicName, final String selectors) {
- try {
- Properties props = new Properties();
- props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- props.put("java.naming.provider.url", connectionUri);
- props.put("topic.topicName", topicName);
-
- javax.naming.Context ctx = new InitialContext(props);
- TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("ConnectionFactory");
- TopicConnection conn = factory.createTopicConnection();
- final Topic topic = (Topic) ctx.lookup("topicName");
- TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber receiver = session.createSubscriber(topic, selectors, false);
-
- receiver.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- if (message instanceof TextMessage) {
- TextMessage txtMsg = (TextMessage) message;
- String msg = txtMsg.getText();
- LOG.info("Topic Message Received: " + topicName + " - " + msg);
- }
- message.acknowledge();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- conn.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public void publish(String topicName, String message) {
- try {
- Properties props = new Properties();
- props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- props.put("java.naming.provider.url", connectionUri);
- props.put("topic.topicName", topicName);
- javax.naming.Context ctx = new InitialContext(props);
- TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("ConnectionFactory");
- TopicConnection conn = factory.createTopicConnection();
- Topic topic = (Topic) ctx.lookup("topicName");
- TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicPublisher publisher = session.createPublisher(topic);
- if (message != null) {
- Message msg = session.createTextMessage(message);
- publisher.send(msg);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Test
- public void tryXpathSelectorMatch() throws Exception {
- String xPath = "XPATH '//books//book[@lang=''en'']'";
- listenQueue("Consumer.Sample.VirtualTopic.TestXpath", xPath);
- publish("VirtualTopic.TestXpath", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><books><book lang=\"en\">ABC</book></books>");
- assertTrue("topic received: ", qreceived.await(20, TimeUnit.SECONDS));
- }
-
- @Test
- public void tryXpathSelectorNoMatch() throws Exception {
- String xPath = "XPATH '//books//book[@lang=''es'']'";
- listenQueue("Consumer.Sample.VirtualTopic.TestXpath", xPath);
- publish("VirtualTopic.TestXpath", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><books><book lang=\"en\">ABC</book></books>");
- assertFalse("topic did not receive unmatched", qreceived.await(5, TimeUnit.SECONDS));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
deleted file mode 100644
index 8067305..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerTestSupport;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.usecases.MyObject;
-
-public class AMQ2103Test extends BrokerTestSupport {
-
- static PolicyEntry reduceMemoryFootprint = new PolicyEntry();
-
- static {
- reduceMemoryFootprint.setReduceMemoryFootprint(true);
- }
-
- public PolicyEntry defaultPolicy = reduceMemoryFootprint;
-
- @Override
- protected PolicyEntry getDefaultPolicy() {
- return defaultPolicy;
- }
-
- public void initCombosForTestVerifyMarshalledStateIsCleared() throws Exception {
- addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, null});
- }
-
- public static Test suite() {
- return suite(AMQ2103Test.class);
- }
-
- /**
- * use mem persistence so no marshaling,
- * reduceMemoryFootprint on/off that will reduce memory by whacking the marshaled state
- * With vm transport and deferred serialisation and no persistence (mem persistence),
- * we see the message as sent by the client so we can validate the contents against
- * the policy
- *
- * @throws Exception
- */
- public void testVerifyMarshalledStateIsCleared() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
- factory.setOptimizedMessageDispatch(true);
- factory.setObjectMessageSerializationDefered(true);
- factory.setCopyMessageOnSend(false);
-
- Connection connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- ActiveMQDestination destination = new ActiveMQQueue("testQ");
- MessageConsumer consumer = session.createConsumer(destination);
- connection.start();
-
- MessageProducer producer = session.createProducer(destination);
- final MyObject obj = new MyObject("A message");
- ActiveMQObjectMessage m1 = (ActiveMQObjectMessage) session.createObjectMessage();
- m1.setObject(obj);
- producer.send(m1);
-
- ActiveMQTextMessage m2 = new ActiveMQTextMessage();
- m2.setText("Test Message Payload.");
- producer.send(m2);
-
- ActiveMQMapMessage m3 = new ActiveMQMapMessage();
- m3.setString("text", "my message");
- producer.send(m3);
-
- Message m = consumer.receive(maxWait);
- assertNotNull(m);
- assertEquals(m1.getMessageId().toString(), m.getJMSMessageID());
- assertTrue(m instanceof ActiveMQObjectMessage);
-
- if (getDefaultPolicy() != null) {
- assertNull("object data cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", ((ActiveMQObjectMessage) m).getObject());
- }
-
- // verify no serialisation via vm transport
- assertEquals("writeObject called", 0, obj.getWriteObjectCalled());
- assertEquals("readObject called", 0, obj.getReadObjectCalled());
- assertEquals("readObjectNoData called", 0, obj.getReadObjectNoDataCalled());
-
- m = consumer.receive(maxWait);
- assertNotNull(m);
- assertEquals(m2.getMessageId().toString(), m.getJMSMessageID());
- assertTrue(m instanceof ActiveMQTextMessage);
-
- if (getDefaultPolicy() != null) {
- assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", ((ActiveMQTextMessage) m).getText());
- }
-
- m = consumer.receive(maxWait);
- assertNotNull(m);
- assertEquals(m3.getMessageId().toString(), m.getJMSMessageID());
- assertTrue(m instanceof ActiveMQMapMessage);
-
- if (getDefaultPolicy() != null) {
- assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", ((ActiveMQMapMessage) m).getStringProperty("text"));
- }
-
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java
deleted file mode 100644
index 8cda3ef..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.leveldb.LevelDBStore;
-
-public class AMQ2149LevelDBTest extends AMQ2149Test {
-
- @Override
- protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception {
- LevelDBStore persistenceFactory = new LevelDBStore();
- persistenceFactory.setDirectory(dataDirFile);
- brokerService.setPersistenceAdapter(persistenceFactory);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
deleted file mode 100644
index 19dbf0e..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
+++ /dev/null
@@ -1,614 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.io.File;
-import java.lang.IllegalStateException;
-import java.util.HashSet;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Vector;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.*;
-
-import org.apache.activemq.AutoFailTestSupport;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationStatistics;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.util.LoggingBrokerPlugin;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-interface Configurer {
-
- public void configure(BrokerService broker) throws Exception;
-}
-
-public class AMQ2149Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ2149Test.class);
- @Rule
- public TestName testName = new TestName();
-
- private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
- private static final String DEFAULT_BROKER_URL = "failover:(" + BROKER_CONNECTOR + ")?maxReconnectDelay=1000&useExponentialBackOff=false";
-
- private final String SEQ_NUM_PROPERTY = "seqNum";
-
- final int MESSAGE_LENGTH_BYTES = 75 * 1024;
- final long SLEEP_BETWEEN_SEND_MS = 25;
- final int NUM_SENDERS_AND_RECEIVERS = 10;
- final Object brokerLock = new Object();
-
- private static final long DEFAULT_BROKER_STOP_PERIOD = 10 * 1000;
- private static final long DEFAULT_NUM_TO_SEND = 1400;
-
- long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
- long numtoSend = DEFAULT_NUM_TO_SEND;
- long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
- String brokerURL = DEFAULT_BROKER_URL;
-
- int numBrokerRestarts = 0;
- final static int MAX_BROKER_RESTARTS = 4;
- BrokerService broker;
- Vector<Throwable> exceptions = new Vector<>();
-
- protected File dataDirFile;
- final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()};
-
- public void createBroker(Configurer configurer) throws Exception {
- broker = new BrokerService();
- configurePersistenceAdapter(broker);
-
- broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS);
-
- broker.addConnector(BROKER_CONNECTOR);
- broker.setBrokerName(testName.getMethodName());
- broker.setDataDirectoryFile(dataDirFile);
- if (configurer != null) {
- configurer.configure(broker);
- }
- broker.start();
- }
-
- protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception {
- }
-
- @Before
- public void setUp() throws Exception {
- LOG.debug("Starting test {}", testName.getMethodName());
- dataDirFile = new File("target/" + testName.getMethodName());
- numtoSend = DEFAULT_NUM_TO_SEND;
- brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
- sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
- brokerURL = DEFAULT_BROKER_URL;
- }
-
- @After
- public void tearDown() throws Exception {
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future<Boolean> future = executor.submit(new TeardownTask(brokerLock, broker));
- try {
- LOG.debug("Teardown started.");
- long start = System.currentTimeMillis();
- Boolean result = future.get(30, TimeUnit.SECONDS);
- long finish = System.currentTimeMillis();
- LOG.debug("Result of teardown: {} after {} ms ", result, (finish - start));
- }
- catch (TimeoutException e) {
- fail("Teardown timed out");
- AutoFailTestSupport.dumpAllThreads(testName.getMethodName());
- }
- executor.shutdownNow();
- exceptions.clear();
- }
-
- private String buildLongString() {
- final StringBuilder stringBuilder = new StringBuilder(MESSAGE_LENGTH_BYTES);
- for (int i = 0; i < MESSAGE_LENGTH_BYTES; ++i) {
- stringBuilder.append((int) (Math.random() * 10));
- }
- return stringBuilder.toString();
- }
-
- HashSet<Connection> connections = new HashSet<>();
-
- private class Receiver implements MessageListener {
-
- private final javax.jms.Destination dest;
-
- private final Connection connection;
-
- private final Session session;
-
- private final MessageConsumer messageConsumer;
-
- private AtomicLong nextExpectedSeqNum = new AtomicLong();
-
- private final boolean transactional;
-
- private String lastId = null;
-
- public Receiver(javax.jms.Destination dest, boolean transactional) throws JMSException {
- this.dest = dest;
- this.transactional = transactional;
- connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
- connection.setClientID(dest.toString());
- session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
- if (ActiveMQDestination.transform(dest).isTopic()) {
- messageConsumer = session.createDurableSubscriber((Topic) dest, dest.toString());
- }
- else {
- messageConsumer = session.createConsumer(dest);
- }
- messageConsumer.setMessageListener(this);
- connection.start();
- connections.add(connection);
- }
-
- public void close() throws JMSException {
- connection.close();
- }
-
- public long getNextExpectedSeqNo() {
- return nextExpectedSeqNum.get();
- }
-
- final int TRANSACITON_BATCH = 500;
- boolean resumeOnNextOrPreviousIsOk = false;
-
- @Override
- public void onMessage(Message message) {
- try {
- final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
- if ((seqNum % TRANSACITON_BATCH) == 0) {
- LOG.info(dest + " received " + seqNum);
-
- if (transactional) {
- LOG.info("committing..");
- session.commit();
- }
- }
- if (resumeOnNextOrPreviousIsOk) {
- // after an indoubt commit we need to accept what we get (within reason)
- if (seqNum != nextExpectedSeqNum.get()) {
- final long l = nextExpectedSeqNum.get();
- if (seqNum == l - (TRANSACITON_BATCH - 1)) {
- nextExpectedSeqNum.compareAndSet(l, l - (TRANSACITON_BATCH - 1));
- LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum);
- }
- }
- resumeOnNextOrPreviousIsOk = false;
- }
- if (seqNum != nextExpectedSeqNum.get()) {
- LOG.warn(dest + " received " + seqNum + " in msg: " + message.getJMSMessageID() + " expected " + nextExpectedSeqNum + ", lastId: " + lastId + ", message:" + message);
- fail(dest + " received " + seqNum + " expected " + nextExpectedSeqNum);
- }
- nextExpectedSeqNum.incrementAndGet();
- lastId = message.getJMSMessageID();
- }
- catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) {
- LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
- if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion in doubt")) {
- // in doubt - either commit command or reply missing
- // don't know if we will get a replay
- resumeOnNextOrPreviousIsOk = true;
- nextExpectedSeqNum.incrementAndGet();
- LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum);
- }
- else {
- resumeOnNextOrPreviousIsOk = false;
- // batch will be replayed
- nextExpectedSeqNum.addAndGet(-(TRANSACITON_BATCH - 1));
- }
-
- }
- catch (Throwable e) {
- LOG.error(dest + " onMessage error", e);
- exceptions.add(e);
- }
- }
-
- }
-
- private class Sender implements Runnable {
-
- private final javax.jms.Destination dest;
-
- private final Connection connection;
-
- private final Session session;
-
- private final MessageProducer messageProducer;
-
- private volatile long nextSequenceNumber = 0;
- private final Object guard = new Object();
-
- public Sender(javax.jms.Destination dest) throws JMSException {
- this.dest = dest;
- connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- messageProducer = session.createProducer(dest);
- messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- connection.start();
- connections.add(connection);
- }
-
- @Override
- public void run() {
- final String longString = buildLongString();
- long nextSequenceNumber = this.nextSequenceNumber;
- while (nextSequenceNumber < numtoSend) {
- try {
- final Message message = session.createTextMessage(longString);
- message.setLongProperty(SEQ_NUM_PROPERTY, nextSequenceNumber);
- synchronized (guard) {
- if (nextSequenceNumber == this.nextSequenceNumber) {
- this.nextSequenceNumber = nextSequenceNumber + 1;
- messageProducer.send(message);
- }
- else {
- continue;
- }
- }
-
- if ((nextSequenceNumber % 500) == 0) {
- LOG.info(dest + " sent " + nextSequenceNumber);
- }
-
- }
- catch (javax.jms.IllegalStateException e) {
- LOG.error(dest + " bailing on send error", e);
- exceptions.add(e);
- break;
- }
- catch (Exception e) {
- LOG.error(dest + " send error", e);
- exceptions.add(e);
- }
- if (sleepBetweenSend > 0) {
- try {
- Thread.sleep(sleepBetweenSend);
- }
- catch (InterruptedException e) {
- LOG.warn(dest + " sleep interrupted", e);
- }
- }
- }
- try {
- connection.close();
- }
- catch (JMSException ignored) {
- }
- }
- }
-
- // attempt to simply replicate leveldb failure. no joy yet
- public void x_testRestartReReceive() throws Exception {
- createBroker(new Configurer() {
- @Override
- public void configure(BrokerService broker) throws Exception {
- broker.deleteAllMessages();
- }
- });
-
- final javax.jms.Destination destination = ActiveMQDestination.createDestination("test.dest.X", ActiveMQDestination.QUEUE_TYPE);
- Thread thread = new Thread(new Sender(destination));
- thread.start();
- thread.join();
-
- Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
- connection.setClientID(destination.toString());
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer messageConsumer = session.createConsumer(destination);
- connection.start();
-
- int batch = 200;
- long expectedSeq;
-
- final TimerTask restartTask = scheduleRestartTask(null, new Configurer() {
- @Override
- public void configure(BrokerService broker) throws Exception {
- }
- });
-
- expectedSeq = 0;
- for (int s = 0; s < 4; s++) {
- for (int i = 0; i < batch; i++) {
- Message message = messageConsumer.receive(20000);
- assertNotNull("s:" + s + ", i:" + i, message);
- final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
- assertEquals("expected order s:" + s, expectedSeq++, seqNum);
-
- if (i > 0 && i % 600 == 0) {
- LOG.info("Commit on %5");
- // session.commit();
- }
- }
- restartTask.run();
- }
-
- }
-
- // no need to run this unless there are some issues with the others
- public void vanilaVerify_testOrder() throws Exception {
-
- createBroker(new Configurer() {
- @Override
- public void configure(BrokerService broker) throws Exception {
- broker.deleteAllMessages();
- }
- });
-
- verifyOrderedMessageReceipt();
- verifyStats(false);
- }
-
- @Test(timeout = 5 * 60 * 1000)
- public void testOrderWithRestart() throws Exception {
- createBroker(new Configurer() {
- @Override
- public void configure(BrokerService broker) throws Exception {
- broker.deleteAllMessages();
- }
- });
-
- final Timer timer = new Timer();
- scheduleRestartTask(timer, new Configurer() {
- @Override
- public void configure(BrokerService broker) throws Exception {
- }
- });
-
- try {
- verifyOrderedMessageReceipt();
- }
- finally {
- timer.cancel();
- }
-
- verifyStats(true);
- }
-
- @Test(timeout = 5 * 60 * 1000)
- public void testTopicOrderWithRestart() throws Exception {
- createBroker(new Configurer() {
- @Override
- public void configure(BrokerService broker) throws Exception {
- broker.deleteAllMessages();
- }
- });
-
- final Timer timer = new Timer();
- scheduleRestartTask(timer, null);
-
- try {
- verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE);
- }
- finally {
- timer.cancel();
- }
-
- verifyStats(true);
- }
-
- @Test(timeout = 5 * 60 * 1000)
- public void testQueueTransactionalOrderWithRestart() throws Exception {
- doTestTransactionalOrderWithRestart(ActiveMQDestination.QUEUE_TYPE);
- }
-
- @Test(timeout = 5 * 60 * 1000)
- public void testTopicTransactionalOrderWithRestart() throws Exception {
- doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE);
- }
-
- public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
- numtoSend = 10000;
- sleepBetweenSend = 3;
- brokerStopPeriod = 10 * 1000;
-
- createBroker(new Configurer() {
- @Override
- public void configure(BrokerService broker) throws Exception {
- broker.deleteAllMessages();
- }
- });
-
- final Timer timer = new Timer();
- scheduleRestartTask(timer, null);
-
- try {
- verifyOrderedMessageReceipt(destinationType, 1, true);
- }
- finally {
- timer.cancel();
- }
-
- verifyStats(true);
- }
-
- private void verifyStats(boolean brokerRestarts) throws Exception {
- RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
-
- for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
- DestinationStatistics stats = dest.getDestinationStatistics();
- if (brokerRestarts) {
- // all bets are off w.r.t stats as there may be duplicate sends and duplicate
- // dispatches, all of which will be suppressed - either by the reference store
- // not allowing duplicate references or consumers acking duplicates
- LOG.info("with restart: not asserting qneue/dequeue stat match for: " + dest.getName() + " " + stats.getEnqueues().getCount() + " <= " + stats.getDequeues().getCount());
- }
- else {
- assertEquals("qneue/dequeue match for: " + dest.getName(), stats.getEnqueues().getCount(), stats.getDequeues().getCount());
- }
- }
- }
-
- private TimerTask scheduleRestartTask(final Timer timer, final Configurer configurer) {
- class RestartTask extends TimerTask {
-
- @Override
- public void run() {
- synchronized (brokerLock) {
- LOG.info("stopping broker..");
- try {
- broker.stop();
- broker.waitUntilStopped();
- }
- catch (Exception e) {
- LOG.error("ex on broker stop", e);
- exceptions.add(e);
- }
- LOG.info("restarting broker");
- try {
- createBroker(configurer);
- broker.waitUntilStarted();
- }
- catch (Exception e) {
- LOG.error("ex on broker restart", e);
- exceptions.add(e);
- }
- }
- if (++numBrokerRestarts < MAX_BROKER_RESTARTS && timer != null) {
- // do it again
- try {
- timer.schedule(new RestartTask(), brokerStopPeriod);
- }
- catch (IllegalStateException ignore_alreadyCancelled) {
- }
- }
- else {
- LOG.info("no longer stopping broker on reaching Max restarts: " + MAX_BROKER_RESTARTS);
- }
- }
- }
- RestartTask task = new RestartTask();
- if (timer != null) {
- timer.schedule(task, brokerStopPeriod);
- }
- return task;
- }
-
- private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
- verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false);
- }
-
- private void verifyOrderedMessageReceipt() throws Exception {
- verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false);
- }
-
- private void verifyOrderedMessageReceipt(byte destinationType,
- int concurrentPairs,
- boolean transactional) throws Exception {
-
- Vector<Thread> threads = new Vector<>();
- Vector<Receiver> receivers = new Vector<>();
-
- for (int i = 0; i < concurrentPairs; ++i) {
- final javax.jms.Destination destination = ActiveMQDestination.createDestination("test.dest." + i, destinationType);
- receivers.add(new Receiver(destination, transactional));
- Thread thread = new Thread(new Sender(destination));
- thread.start();
- threads.add(thread);
- }
-
- final long expiry = System.currentTimeMillis() + 1000 * 60 * 4;
- while (!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
- Thread sendThread = threads.firstElement();
- sendThread.join(1000 * 30);
- if (!sendThread.isAlive()) {
- threads.remove(sendThread);
- }
- else {
- AutoFailTestSupport.dumpAllThreads("Send blocked");
- }
- }
- LOG.info("senders done..." + threads);
-
- while (!receivers.isEmpty() && System.currentTimeMillis() < expiry) {
- Receiver receiver = receivers.firstElement();
- if (receiver.getNextExpectedSeqNo() >= numtoSend || !exceptions.isEmpty()) {
- receiver.close();
- receivers.remove(receiver);
- }
- }
-
- for (Connection connection : connections) {
- try {
- connection.close();
- }
- catch (Exception ignored) {
- }
- }
- connections.clear();
-
- assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < expiry);
- if (!exceptions.isEmpty()) {
- exceptions.get(0).printStackTrace();
- }
-
- LOG.info("Dangling threads: " + threads);
- for (Thread dangling : threads) {
- dangling.interrupt();
- dangling.join(10 * 1000);
- }
-
- assertTrue("No exceptions", exceptions.isEmpty());
- }
-
-}
-
-class TeardownTask implements Callable<Boolean> {
-
- private final Object brokerLock;
- private BrokerService broker;
-
- public TeardownTask(Object brokerLock, BrokerService broker) {
- this.brokerLock = brokerLock;
- this.broker = broker;
- }
-
- @Override
- public Boolean call() throws Exception {
- synchronized (brokerLock) {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
- return Boolean.TRUE;
- }
-}