You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/07 17:43:17 UTC
svn commit: r1443600 [3/4] - in
/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq:
perf/ security/ selector/ spring/ store/ store/jdbc/ store/kahadb/
store/kahadb/perf/ test/message/ test/retroactive/ transport/
transport/failover/ ...
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java Thu Feb 7 16:43:15 2013
@@ -29,12 +29,11 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.test.JmsSendReceiveTestSupport;
-import org.apache.activemq.test.retroactive.RetroactiveConsumerTestWithSimpleMessageListTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
+ *
*/
public class CompositePublishTest extends JmsSendReceiveTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(CompositePublishTest.class);
@@ -43,8 +42,11 @@ public class CompositePublishTest extend
protected Connection receiveConnection;
protected Session receiveSession;
protected MessageConsumer[] consumers;
+ @SuppressWarnings("rawtypes")
protected List[] messageLists;
+ @SuppressWarnings("unchecked")
+ @Override
protected void setUp() throws Exception {
super.setUp();
@@ -95,6 +97,7 @@ public class CompositePublishTest extend
protected MessageListener createMessageListener(int i, final List<Message> messageList) {
return new MessageListener() {
+ @Override
public void onMessage(Message message) {
consumeMessage(message, messageList);
}
@@ -104,6 +107,7 @@ public class CompositePublishTest extend
/**
* Returns the subject on which we publish
*/
+ @Override
protected String getSubject() {
return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y";
}
@@ -119,6 +123,8 @@ public class CompositePublishTest extend
return super.getSubject() + ".";
}
+ @SuppressWarnings("unchecked")
+ @Override
protected void assertMessagesAreReceived() throws JMSException {
waitForMessagesToBeDelivered();
int size = messageLists.length;
@@ -131,10 +137,12 @@ public class CompositePublishTest extend
}
}
+ @Override
protected ActiveMQConnectionFactory createConnectionFactory() {
return new ActiveMQConnectionFactory("vm://localhost");
}
+ @Override
protected void tearDown() throws Exception {
session.close();
receiveSession.close();
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java Thu Feb 7 16:43:15 2013
@@ -18,56 +18,47 @@ package org.apache.activemq.usecases;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.JmsConnectionStartStopTest;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
/**
- * Test case intended to demonstrate delivery interruption to queue consumers when
- * a JMS selector leaves some messages on the queue (due to use of a JMS Selector)
- *
- * testNonDiscriminatingConsumer() demonstrates proper functionality for consumers that don't use
- * a selector to qualify their input.
- *
- * testDiscriminatingConsumer() demonstrates the failure condition in which delivery to the consumer
- * eventually halts.
- *
- * The expected behavior is for the delivery to the client to be maintained regardless of the depth
- * of the queue, particularly when the messages in the queue do not meet the selector criteria of the
- * client.
+ * Test case intended to demonstrate delivery interruption to queue consumers when a JMS selector leaves some messages
+ * on the queue (due to use of a JMS Selector)
+ *
+ * testNonDiscriminatingConsumer() demonstrates proper functionality for consumers that don't use a selector to qualify
+ * their input.
+ *
+ * testDiscriminatingConsumer() demonstrates the failure condition in which delivery to the consumer eventually halts.
+ *
+ * The expected behavior is for the delivery to the client to be maintained regardless of the depth of the queue,
+ * particularly when the messages in the queue do not meet the selector criteria of the client.
*
* https://issues.apache.org/activemq/browse/AMQ-2217
- *
+ *
*/
public class DiscriminatingConsumerLoadTest extends TestSupport {
- private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
- .getLog(DiscriminatingConsumerLoadTest.class);
+ private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory.getLog(DiscriminatingConsumerLoadTest.class);
+
+ private Connection producerConnection;
+ private Connection consumerConnection;
- private Connection producerConnection;
- private Connection consumerConnection;
- private int counterSent = 0;
- private int counterReceived = 0;
-
- public static final String JMSTYPE_EATME = "DiscriminatingLoadClient.EatMe";
- public static final String JMSTYPE_IGNOREME = "DiscriminatingLoadClient.IgnoreMe";
+ public static final String JMSTYPE_EATME = "DiscriminatingLoadClient.EatMe";
+ public static final String JMSTYPE_IGNOREME = "DiscriminatingLoadClient.IgnoreMe";
- private int testSize = 5000; // setting this to a small number will pass all tests
+ private final int testSize = 5000; // setting this to a small number will pass all tests
BrokerService broker;
- protected void setUp() throws Exception {
+ @Override
+ protected void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
@@ -79,258 +70,241 @@ public class DiscriminatingConsumerLoadT
broker.setDestinationPolicy(policyMap);
broker.start();
- super.setUp();
- this.producerConnection = this.createConnection();
- this.consumerConnection = this.createConnection();
- }
-
- /**
- * @see junit.framework.TestCase#tearDown()
- */
- protected void tearDown() throws Exception {
- if (producerConnection != null) {
- producerConnection.close();
- producerConnection = null;
- }
- if (consumerConnection != null) {
- consumerConnection.close();
- consumerConnection = null;
- }
- super.tearDown();
+ super.setUp();
+ this.producerConnection = this.createConnection();
+ this.consumerConnection = this.createConnection();
+ }
+
+ /**
+ * @see junit.framework.TestCase#tearDown()
+ */
+ @Override
+ protected void tearDown() throws Exception {
+ if (producerConnection != null) {
+ producerConnection.close();
+ producerConnection = null;
+ }
+ if (consumerConnection != null) {
+ consumerConnection.close();
+ consumerConnection = null;
+ }
+ super.tearDown();
broker.stop();
- }
-
- /**
- * Test to check if a single consumer with no JMS selector will receive all intended messages
- *
- * @throws java.lang.Exception
- */
- public void testNonDiscriminatingConsumer() throws Exception {
-
- consumerConnection = createConnection();
- consumerConnection.start();
- LOG.info("consumerConnection = " +consumerConnection);
-
- try {Thread.sleep(1000); } catch (Exception e) {}
-
- // here we pass in null for the JMS selector
- Consumer consumer = new Consumer(consumerConnection, null);
- Thread consumerThread = new Thread(consumer);
-
- consumerThread.start();
-
- producerConnection = createConnection();
- producerConnection.start();
- LOG.info("producerConnection = " +producerConnection);
+ }
- try {Thread.sleep(3000); } catch (Exception e) {}
+ /**
+ * Test to check if a single consumer with no JMS selector will receive all intended messages
+ *
+ * @throws java.lang.Exception
+ */
+ public void testNonDiscriminatingConsumer() throws Exception {
+
+ consumerConnection = createConnection();
+ consumerConnection.start();
+ LOG.info("consumerConnection = " + consumerConnection);
+
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ }
- Producer producer = new Producer(producerConnection);
- Thread producerThread = new Thread(producer);
- producerThread.start();
+ // here we pass in null for the JMS selector
+ Consumer consumer = new Consumer(consumerConnection, null);
+ Thread consumerThread = new Thread(consumer);
+
+ consumerThread.start();
+
+ producerConnection = createConnection();
+ producerConnection.start();
+ LOG.info("producerConnection = " + producerConnection);
+
+ try {
+ Thread.sleep(3000);
+ } catch (Exception e) {
+ }
- // now that everything is running, let's wait for the consumer thread to finish ...
- consumerThread.join();
- producer.stop = true;
+ Producer producer = new Producer(producerConnection);
+ Thread producerThread = new Thread(producer);
+ producerThread.start();
+
+ // now that everything is running, let's wait for the consumer thread to finish ...
+ consumerThread.join();
+ producer.stop = true;
- if (consumer.getCount() == testSize )
- LOG.info("test complete .... all messsages consumed!!");
- else
- LOG.info("test failed .... Sent " + (testSize / 1) +
- " messages intended to be consumed ( " + testSize + " total), but only consumed " + consumer.getCount());
+ if (consumer.getCount() == testSize)
+ LOG.info("test complete .... all messsages consumed!!");
+ else
+ LOG.info("test failed .... Sent " + (testSize / 1) + " messages intended to be consumed ( " + testSize + " total), but only consumed "
+ + consumer.getCount());
+ assertTrue("Sent " + testSize + " messages intended to be consumed, but only consumed " + consumer.getCount(), (consumer.getCount() == testSize));
+ assertFalse("Delivery of messages to consumer was halted during this test", consumer.deliveryHalted());
+ }
+
+ /**
+ * Test to check if a single consumer with a JMS selector will receive all intended messages
+ *
+ * @throws java.lang.Exception
+ */
+ public void testDiscriminatingConsumer() throws Exception {
+
+ consumerConnection = createConnection();
+ consumerConnection.start();
+ LOG.info("consumerConnection = " + consumerConnection);
+
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ }
- assertTrue("Sent " + testSize + " messages intended to be consumed, but only consumed " + consumer.getCount(),
- (consumer.getCount() == testSize ));
- assertFalse("Delivery of messages to consumer was halted during this test", consumer.deliveryHalted());
+ // here we pass the JMS selector we intend to consume
+ Consumer consumer = new Consumer(consumerConnection, JMSTYPE_EATME);
+ Thread consumerThread = new Thread(consumer);
+
+ consumerThread.start();
+
+ producerConnection = createConnection();
+ producerConnection.start();
+ LOG.info("producerConnection = " + producerConnection);
+
+ try {
+ Thread.sleep(3000);
+ } catch (Exception e) {
+ }
+ Producer producer = new Producer(producerConnection);
+ Thread producerThread = new Thread(producer);
+ producerThread.start();
+
+ // now that everything is running, let's wait for the consumer thread to finish ...
+ consumerThread.join();
+ producer.stop = true;
+
+ if (consumer.getCount() == (testSize / 2)) {
+ LOG.info("test complete .... all messsages consumed!!");
+ } else {
+ LOG.info("test failed .... Sent " + testSize + " original messages, only half of which (" + (testSize / 2)
+ + ") were intended to be consumed: consumer paused at: " + consumer.getCount());
+ // System.out.println("test failed .... Sent " + testSize + " original messages, only half of which (" +
+ // (testSize / 2) +
+ // ") were intended to be consumed: consumer paused at: " + consumer.getCount());
- }
-
- /**
- * Test to check if a single consumer with a JMS selector will receive all intended messages
- *
- * @throws java.lang.Exception
- */
- public void testDiscriminatingConsumer() throws Exception {
+ }
- consumerConnection = createConnection();
- consumerConnection.start();
- LOG.info("consumerConnection = " +consumerConnection);
+ assertTrue("Sent " + testSize + " original messages, only half of which (" + (testSize / 2) + ") were intended to be consumed: consumer paused at: "
+ + consumer.getCount(), (consumer.getCount() == (testSize / 2)));
+ assertTrue("Delivery of messages to consumer was halted during this test as it only wants half", consumer.deliveryHalted());
+ }
+
+ /**
+ * Helper class that will publish 2 * testSize messages. The messages will be distributed evenly between the
+ * following two JMS types:
+ *
+ * @see JMSTYPE_INTENDED_FOR_CONSUMPTION
+ * @see JMSTYPE_NOT_INTENDED_FOR_CONSUMPTION
+ *
+ */
+ private class Producer extends Thread {
+ private int counterSent = 0;
+ private Connection connection = null;
+ public boolean stop = false;
- try {Thread.sleep(1000); } catch (Exception e) {}
+ public Producer(Connection connection) {
+ this.connection = connection;
+ }
- // here we pass the JMS selector we intend to consume
- Consumer consumer = new Consumer(consumerConnection, JMSTYPE_EATME);
- Thread consumerThread = new Thread(consumer);
+ @Override
+ public void run() {
+ try {
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue queue = session.createQueue("test");
+
+ // wait for 10 seconds to allow consumer.receive to be run
+ // first
+ Thread.sleep(10000);
+ MessageProducer producer = session.createProducer(queue);
+
+ while (!stop && (counterSent < testSize)) {
+ // first send a message intended to be consumed ....
+ TextMessage message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ...
+ message.setJMSType(JMSTYPE_EATME);
+ // LOG.info("sending .... JMSType = " + message.getJMSType());
+ producer.send(message, DeliveryMode.NON_PERSISTENT, 0, 1800000);
+
+ counterSent++;
+
+ // now send a message intended to be consumed by some other consumer in the the future
+ // ... we expect these messages to accrue in the queue
+ message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ...
+ message.setJMSType(JMSTYPE_IGNOREME);
+ // LOG.info("sending .... JMSType = " + message.getJMSType());
+ producer.send(message, DeliveryMode.NON_PERSISTENT, 0, 1800000);
+
+ counterSent++;
+ }
+
+ session.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ LOG.info("producer thread complete ... " + counterSent + " messages sent to the queue");
+ }
+ }
- consumerThread.start();
+ /**
+ * Helper class that will consume messages from the queue based on the supplied JMS selector. Thread will stop after
+ * the first receive(..) timeout, or once all expected messages have been received (see testSize). If the thread
+ * stops due to a timeout, it is experiencing the delivery pause that is symptomatic of a bug in the broker.
+ *
+ */
+ private class Consumer extends Thread {
+ protected int counterReceived = 0;
+ private String jmsSelector = null;
+ private boolean deliveryHalted = false;
- producerConnection = createConnection();
- producerConnection.start();
- LOG.info("producerConnection = " +producerConnection);
+ public Consumer(Connection connection, String jmsSelector) {
+ this.jmsSelector = jmsSelector;
+ }
- try {Thread.sleep(3000); } catch (Exception e) {}
+ @Override
+ public void run() {
+ try {
+ Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue queue = session.createQueue("test");
+ MessageConsumer consumer = null;
+ if (null != this.jmsSelector) {
+ consumer = session.createConsumer(queue, "JMSType='" + this.jmsSelector + "'");
+ } else {
+ consumer = session.createConsumer(queue);
+ }
+
+ while (!deliveryHalted && (counterReceived < testSize)) {
+ TextMessage result = (TextMessage) consumer.receive(30000);
+ if (result != null) {
+ counterReceived++;
+ // System.out.println("consuming .... JMSType = " + result.getJMSType() + " received = " +
+ // counterReceived);
+ LOG.info("consuming .... JMSType = " + result.getJMSType() + " received = " + counterReceived);
+ } else {
+ LOG.info("consuming .... timeout while waiting for a message ... broker must have stopped delivery ... received = " + counterReceived);
+ deliveryHalted = true;
+ }
+ }
+ session.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
- Producer producer = new Producer(producerConnection);
- Thread producerThread = new Thread(producer);
- producerThread.start();
+ }
- // now that everything is running, let's wait for the consumer thread to finish ...
- consumerThread.join();
- producer.stop = true;
+ public int getCount() {
+ return this.counterReceived;
+ }
- if (consumer.getCount() == (testSize / 2))
- {
- LOG.info("test complete .... all messsages consumed!!");
+ public boolean deliveryHalted() {
+ return this.deliveryHalted;
}
- else
- {
- LOG.info("test failed .... Sent " + testSize + " original messages, only half of which (" + (testSize / 2) +
- ") were intended to be consumed: consumer paused at: " + consumer.getCount());
- //System.out.println("test failed .... Sent " + testSize + " original messages, only half of which (" + (testSize / 2) +
- // ") were intended to be consumed: consumer paused at: " + consumer.getCount());
-
- }
-
- assertTrue("Sent " + testSize + " original messages, only half of which (" + (testSize / 2) +
- ") were intended to be consumed: consumer paused at: " + consumer.getCount(),
- (consumer.getCount() == (testSize / 2)));
- assertTrue("Delivery of messages to consumer was halted during this test as it only wants half", consumer.deliveryHalted());
- }
-
- /**
- * Helper class that will publish 2 * testSize messages. The messages will be distributed evenly
- * between the following two JMS types:
- *
- * @see JMSTYPE_INTENDED_FOR_CONSUMPTION
- * @see JMSTYPE_NOT_INTENDED_FOR_CONSUMPTION
- *
- * @author jlyons
- *
- */
- private class Producer extends Thread
- {
- private int counterSent = 0;
- private Connection connection = null;
- public boolean stop = false;
-
- public Producer(Connection connection)
- {
- this.connection = connection;
- }
-
- public void run() {
- try {
- final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue queue = session.createQueue("test");
-
- // wait for 10 seconds to allow consumer.receive to be run
- // first
- Thread.sleep(10000);
- MessageProducer producer = session.createProducer(queue);
-
- while (!stop && (counterSent < testSize))
- {
- // first send a message intended to be consumed ....
- TextMessage message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ...
- message.setJMSType(JMSTYPE_EATME);
- //LOG.info("sending .... JMSType = " + message.getJMSType());
- producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000);
-
- counterSent++;
-
- // now send a message intended to be consumed by some other consumer in the the future
- // ... we expect these messages to accrue in the queue
- message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ...
- message.setJMSType(JMSTYPE_IGNOREME);
- //LOG.info("sending .... JMSType = " + message.getJMSType());
- producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000);
-
- counterSent++;
- }
-
- session.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- LOG.info("producer thread complete ... " + counterSent + " messages sent to the queue");
- }
-
- public int getCount()
- {
- return this.counterSent;
- }
-
- }
-
- /**
- * Helper class that will consume messages from the queue based on the supplied JMS selector.
- * Thread will stop after the first receive(..) timeout, or once all expected messages have
- * been received (see testSize). If the thread stops due to a timeout, it is experiencing the
- * delivery pause that is symptomatic of a bug in the broker.
- *
- * @author jlyons
- *
- */
- private class Consumer extends Thread
- {
- protected int counterReceived = 0;
- private Connection connection = null;
- private String jmsSelector = null;
- private boolean deliveryHalted = false;
-
- public Consumer(Connection connection, String jmsSelector)
- {
- this.connection = connection;
- this.jmsSelector = jmsSelector;
- }
-
- public void run() {
- boolean testComplete = false;
- try {
- Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue queue = session.createQueue("test");
- MessageConsumer consumer = null;
- if (null != this.jmsSelector)
- {
- consumer = session.createConsumer(queue, "JMSType='" + this.jmsSelector + "'");
- }
- else
- {
- consumer = session.createConsumer(queue);
- }
-
- while (!deliveryHalted && (counterReceived < testSize))
- {
- TextMessage result = (TextMessage) consumer.receive(30000);
- if (result != null) {
- counterReceived++;
- //System.out.println("consuming .... JMSType = " + result.getJMSType() + " received = " + counterReceived);
- LOG.info("consuming .... JMSType = " + result.getJMSType() + " received = " + counterReceived);
- } else
- {
- LOG.info("consuming .... timeout while waiting for a message ... broker must have stopped delivery ... received = " + counterReceived);
- deliveryHalted = true;
- }
- }
- session.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- public int getCount()
- {
- return this.counterReceived;
- }
-
- public boolean deliveryHalted()
- {
- return this.deliveryHalted;
- }
- }
+ }
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubProcessConcurrentCommitActivateNoDuplicateTest.java Thu Feb 7 16:43:15 2013
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.usecases;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -27,6 +29,7 @@ import java.util.concurrent.ConcurrentLi
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@@ -53,9 +56,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertTrue;
-
public class DurableSubProcessConcurrentCommitActivateNoDuplicateTest {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessConcurrentCommitActivateNoDuplicateTest.class);
public static final long RUNTIME = 5 * 60 * 1000;
@@ -69,9 +69,9 @@ public class DurableSubProcessConcurrent
public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 10 * 1000);
public static final int CLIENT_OFFLINE_DURING_COMMIT = 2; // random(x) == x
-
+
public static final Persistence PERSISTENT_ADAPTER = Persistence.KAHADB;
-
+
public static final long BROKER_RESTART = -2 * 60 * 1000;
public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true;
@@ -126,10 +126,10 @@ public class DurableSubProcessConcurrent
//allow the clients to unsubscribe before finishing
clientManager.setEnd(true);
try {
- Thread.sleep(60 * 1000);
- } catch (InterruptedException e) {
- exit("ProcessTest.testProcess failed.", e);
- }
+ Thread.sleep(60 * 1000);
+ } catch (InterruptedException e) {
+ exit("ProcessTest.testProcess failed.", e);
+ }
server.done = true;
@@ -181,7 +181,7 @@ public class DurableSubProcessConcurrent
int transRover = 0;
int messageRover = 0;
- public volatile int committingTransaction = -1;
+ public volatile int committingTransaction = -1;
public boolean done = false;
public Server() {
super("Server");
@@ -194,8 +194,8 @@ public class DurableSubProcessConcurrent
try {
while (!done) {
- Thread.sleep(1000);
-
+ Thread.sleep(1000);
+
processLock.readLock().lock();
try {
send();
@@ -347,7 +347,7 @@ public class DurableSubProcessConcurrent
private final CopyOnWriteArrayList<Client> clients = new CopyOnWriteArrayList<Client>();
- private boolean end;
+ private boolean end;
public ClientManager() {
super("ClientManager");
@@ -355,11 +355,11 @@ public class DurableSubProcessConcurrent
}
public synchronized void setEnd(boolean end) {
- this.end = end;
-
- }
+ this.end = end;
+
+ }
- @Override
+ @Override
public void run() {
try {
while (true) {
@@ -491,10 +491,10 @@ public class DurableSubProcessConcurrent
offline.sleepRandom();
else
sleep = true;
- */
-
+ */
+
Thread.sleep(100);
-
+
processLock.readLock().lock();
onlineCount.incrementAndGet();
try {
@@ -524,7 +524,7 @@ public class DurableSubProcessConcurrent
private void process(long millis) throws JMSException {
//long end = System.currentTimeMillis() + millis;
- long end = System.currentTimeMillis() + 200;
+ long end = System.currentTimeMillis() + 200;
long hardEnd = end + 20000; // wait to finish the transaction.
boolean inTransaction = false;
int transCount = 0;
@@ -565,7 +565,7 @@ public class DurableSubProcessConcurrent
inTransaction = false;
transCount = 0;
-
+
int committing = server.committingTransaction;
if (committing == trans) {
LOG.info("Going offline during transaction commit. messageID=" + message.getIntProperty("ID"));
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java Thu Feb 7 16:43:15 2013
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.usecases;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -26,6 +29,7 @@ import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@@ -33,6 +37,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
@@ -47,10 +52,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class DurableSubProcessWithRestartTest {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessWithRestartTest.class);
public static final long RUNTIME = 5 * 60 * 1000;
@@ -93,22 +94,18 @@ public class DurableSubProcessWithRestar
if (ALLOW_SUBSCRIPTION_ABANDONMENT)
houseKeeper.start();
- if (BROKER_RESTART <= 0)
- Thread.sleep(RUNTIME);
- else {
- long end = System.currentTimeMillis() + RUNTIME;
+ long end = System.currentTimeMillis() + RUNTIME;
- while (true) {
- long now = System.currentTimeMillis();
- if (now > end)
- break;
+ while (true) {
+ long now = System.currentTimeMillis();
+ if (now > end)
+ break;
- now = end - now;
- now = now < BROKER_RESTART ? now : BROKER_RESTART;
- Thread.sleep(now);
+ now = end - now;
+ now = now < BROKER_RESTART ? now : BROKER_RESTART;
+ Thread.sleep(now);
- restartBroker();
- }
+ restartBroker();
}
} catch (Throwable e) {
exit("ProcessTest.testProcess failed.", e);
@@ -588,6 +585,7 @@ public class DurableSubProcessWithRestar
/**
* Checks if the message was not delivered fast enough.
*/
+ @SuppressWarnings("unused")
public void checkDeliveryTime(Message message) throws JMSException {
long creation = message.getJMSTimestamp();
long min = System.currentTimeMillis() - (offline.max + online.min)
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Thu Feb 7 16:43:15 2013
@@ -736,9 +736,7 @@ public class DurableSubscriptionOfflineT
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
- int sent = 0;
for (int i = 0; i < 10; i++) {
- sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "true");
producer.send(topic, message);
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java Thu Feb 7 16:43:15 2013
@@ -25,9 +25,9 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import javax.management.MBeanServer;
-import javax.management.ObjectName;
import junit.framework.Test;
+
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
@@ -74,7 +74,8 @@ public class DurableSubscriptionSelector
sendMessage(true);
- Wait.waitFor(new Wait.Condition() { public boolean isSatisified() { return received >= 1;} }, 10000);
+ Wait.waitFor(new Wait.Condition() { @Override
+ public boolean isSatisified() { return received >= 1;} }, 10000);
assertEquals("Message is not received.", 1, received);
@@ -92,6 +93,7 @@ public class DurableSubscriptionSelector
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subName", "filter=true", false);
subscriber.setMessageListener(new MessageListener() {
+ @Override
public void onMessage(Message message) {
received++;
}
@@ -123,22 +125,10 @@ public class DurableSubscriptionSelector
producerConnection = null;
}
- private int getPendingQueueSize() throws Exception {
- ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
- for (ObjectName sub: subs) {
- if ("cliID".equals(mbs.getAttribute(sub, "ClientId"))) {
- Integer size = (Integer) mbs.getAttribute(sub, "PendingQueueSize");
- return size != null ? size : 0;
- }
- }
- assertTrue(false);
- return -1;
- }
-
private void startBroker(boolean deleteMessages) throws Exception {
broker = new BrokerService();
broker.setBrokerName("test-broker");
-
+
if (deleteMessages) {
broker.setDeleteAllMessagesOnStartup(true);
}
@@ -163,7 +153,8 @@ public class DurableSubscriptionSelector
broker.stop();
broker = null;
}
-
+
+ @Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://test-broker?jms.watchTopicAdvisories=false&waitForStart=5000&create=false");
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java Thu Feb 7 16:43:15 2013
@@ -16,14 +16,21 @@
*/
package org.apache.activemq.usecases;
+import java.io.File;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.management.InstanceNotFoundException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
-import org.apache.activemq.broker.jmx.SubscriptionView;
-import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
@@ -31,14 +38,6 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import javax.jms.Connection;
-import javax.jms.Session;
-import javax.management.*;
-import java.io.File;
-import java.lang.management.ManagementFactory;
-import java.util.List;
-
-
public class DurableSubscriptionUnsubscribeTest extends TestSupport {
BrokerService broker = null;
@@ -193,7 +192,7 @@ public class DurableSubscriptionUnsubscr
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId" + i);
session.close();
- }
+ }
}
@@ -283,6 +282,7 @@ public class DurableSubscriptionUnsubscr
startBroker(false);
}
+ @Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://" + getName() + "?waitForStart=5000&create=false");
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Thu Feb 7 16:43:15 2013
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.usecases;
+import static org.apache.activemq.TestSupport.getDestination;
+import static org.apache.activemq.TestSupport.getDestinationStatistics;
+
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
@@ -42,9 +45,6 @@ import org.apache.activemq.store.amq.AMQ
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.activemq.TestSupport.getDestination;
-import static org.apache.activemq.TestSupport.getDestinationStatistics;
-
public class ExpiredMessagesTest extends CombinationTestSupport {
@@ -69,6 +69,7 @@ public class ExpiredMessagesTest extends
junit.textui.TestRunner.run(suite());
}
+ @Override
protected void setUp() throws Exception {
final boolean deleteAllMessages = true;
broker = createBroker(deleteAllMessages, 100);
@@ -87,6 +88,7 @@ public class ExpiredMessagesTest extends
final AtomicLong received = new AtomicLong();
Thread consumerThread = new Thread("Consumer Thread") {
+ @Override
public void run() {
long start = System.currentTimeMillis();
try {
@@ -109,6 +111,7 @@ public class ExpiredMessagesTest extends
final int numMessagesToSend = 10000;
Thread producingThread = new Thread("Producing Thread") {
+ @Override
public void run() {
try {
int i = 0;
@@ -132,6 +135,7 @@ public class ExpiredMessagesTest extends
// wait for all to inflight to expire
assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
+ @Override
public boolean isSatisified() throws Exception {
return view.getInflight().getCount() == 0;
}
@@ -143,6 +147,7 @@ public class ExpiredMessagesTest extends
// wait for all sent to get delivered and expire
assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() {
+ @Override
public boolean isSatisified() throws Exception {
long oldEnqueues = view.getEnqueues().getCount();
Thread.sleep(200);
@@ -159,6 +164,7 @@ public class ExpiredMessagesTest extends
assertTrue("got at least what did not expire", received.get() >= view.getDequeues().getCount() - view.getExpired().getCount());
assertTrue("all messages expired - queue size gone to zero " + view.getMessages().getCount(), Wait.waitFor(new Wait.Condition() {
+ @Override
public boolean isSatisified() throws Exception {
LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
+ ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
@@ -174,6 +180,7 @@ public class ExpiredMessagesTest extends
+ ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: " + dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount());
Wait.waitFor(new Wait.Condition() {
+ @Override
public boolean isSatisified() throws Exception {
return totalExpiredCount == dlqView.getMessages().getCount();
}
@@ -190,6 +197,7 @@ public class ExpiredMessagesTest extends
dlqConsumer.setMessageListener(dlqListener);
Wait.waitFor(new Wait.Condition() {
+ @Override
public boolean isSatisified() throws Exception {
return totalExpiredCount == dlqListener.count;
}
@@ -202,6 +210,7 @@ public class ExpiredMessagesTest extends
int count = 0;
+ @Override
public void onMessage(Message message) {
count++;
}
@@ -228,6 +237,7 @@ public class ExpiredMessagesTest extends
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Thread producingThread = new Thread("Producing Thread") {
+ @Override
public void run() {
try {
int i = 0;
@@ -266,6 +276,7 @@ public class ExpiredMessagesTest extends
broker = createBroker(deleteAllMessages, 5000);
Wait.waitFor(new Wait.Condition() {
+ @Override
public boolean isSatisified() throws Exception {
DestinationStatistics view = getDestinationStatistics(broker, destination);
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
@@ -311,6 +322,7 @@ public class ExpiredMessagesTest extends
+ @Override
protected void tearDown() throws Exception {
connection.stop();
broker.stop();
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JdbcDurableSubDupTest.java Thu Feb 7 16:43:15 2013
@@ -16,11 +16,14 @@
*/
package org.apache.activemq.usecases;
+import static org.junit.Assert.assertTrue;
+
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@@ -31,20 +34,17 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
-
-import static org.junit.Assert.assertTrue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class JdbcDurableSubDupTest {
@@ -63,7 +63,7 @@ public class JdbcDurableSubDupTest {
final int TO_RECEIVE = 5000;
BrokerService broker = null;
- Vector<Throwable> exceptions = new Vector();
+ Vector<Throwable> exceptions = new Vector<Throwable>();
final int MAX_MESSAGES = 100000;
int[] dupChecker = new int[MAX_MESSAGES];
@@ -136,7 +136,7 @@ public class JdbcDurableSubDupTest {
for (int i = 0; i < TO_RECEIVE; i++) {
assertTrue("got message " + i, dupChecker[i] == 1);
}
-
+
}
class JmsConsumerDup implements MessageListener {
@@ -185,6 +185,7 @@ public class JdbcDurableSubDupTest {
}
}
+ @Override
public void onMessage(Message message) {
++count;
@@ -229,6 +230,7 @@ public class JdbcDurableSubDupTest {
int priorityModulator = 10;
+ @Override
public void run() {
Connection connection;
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java Thu Feb 7 16:43:15 2013
@@ -77,7 +77,7 @@ public class MessageGroupCloseTest exten
if (i % 100 == 0) {
LOG.info("Sent messages: group=" + i);
}
- messageGroupCount++;
+ setMessageGroupCount(getMessageGroupCount() + 1);
}
LOG.info(messagesSent+" messages sent");
latchMessagesCreated.countDown();
@@ -218,4 +218,18 @@ public class MessageGroupCloseTest exten
messageGroups.put(groupId, count + 1);
}
}
+
+ /**
+ * @return the messageGroupCount
+ */
+ public int getMessageGroupCount() {
+ return messageGroupCount;
+ }
+
+ /**
+ * @param messageGroupCount the messageGroupCount to set
+ */
+ public void setMessageGroupCount(int messageGroupCount) {
+ this.messageGroupCount = messageGroupCount;
+ }
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java Thu Feb 7 16:43:15 2013
@@ -40,208 +40,204 @@ import org.apache.activemq.command.Activ
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+public class MessageGroupDelayedTest extends JmsTestSupport {
+ public static final Logger log = LoggerFactory.getLogger(MessageGroupDelayedTest.class);
+ protected Connection connection;
+ protected Session session;
+ protected MessageProducer producer;
+ protected Destination destination;
+
+ public int consumersBeforeDispatchStarts;
+ public int timeBeforeDispatchStarts;
+
+ BrokerService broker;
+ protected TransportConnector connector;
+
+ protected HashMap<String, Integer> messageCount = new HashMap<String, Integer>();
+ protected HashMap<String, Set<String>> messageGroups = new HashMap<String, Set<String>>();
+
+ public static Test suite() {
+ return suite(MessageGroupDelayedTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ broker = createBroker();
+ broker.start();
+ ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=1");
+ connection = connFactory.createConnection();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ destination = new ActiveMQQueue("test-queue2");
+ producer = session.createProducer(destination);
+ connection.start();
+ }
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ BrokerService service = new BrokerService();
+ service.setPersistent(false);
+ service.setUseJmx(false);
+
+ // Setup a destination policy where it takes only 1 message at a time.
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ log.info("testing with consumersBeforeDispatchStarts=" + consumersBeforeDispatchStarts + " and timeBeforeDispatchStarts=" + timeBeforeDispatchStarts);
+ policy.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts);
+ policy.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts);
+ policyMap.setDefaultEntry(policy);
+ service.setDestinationPolicy(policyMap);
+
+ connector = service.addConnector("tcp://localhost:0");
+ return service;
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ producer.close();
+ session.close();
+ connection.close();
+ broker.stop();
+ }
+
+ public void initCombosForTestDelayedDirectConnectionListener() {
+ addCombinationValues("consumersBeforeDispatchStarts", new Object[] { 0, 3, 5 });
+ addCombinationValues("timeBeforeDispatchStarts", new Object[] { 0, 100 });
+ }
+
+ public void testDelayedDirectConnectionListener() throws Exception {
+
+ for (int i = 0; i < 10; i++) {
+ Message msga = session.createTextMessage("hello a");
+ msga.setStringProperty("JMSXGroupID", "A");
+ producer.send(msga);
+ Message msgb = session.createTextMessage("hello b");
+ msgb.setStringProperty("JMSXGroupID", "B");
+ producer.send(msgb);
+ Message msgc = session.createTextMessage("hello c");
+ msgc.setStringProperty("JMSXGroupID", "C");
+ producer.send(msgc);
+ }
+ log.info("30 messages sent to group A/B/C");
+ int[] counters = { 10, 10, 10 };
-public class MessageGroupDelayedTest extends JmsTestSupport {
- public static final Logger log = LoggerFactory.getLogger(MessageGroupDelayedTest.class);
- protected Connection connection;
- protected Session session;
- protected MessageProducer producer;
- protected Destination destination;
-
- public int consumersBeforeDispatchStarts;
- public int timeBeforeDispatchStarts;
-
- BrokerService broker;
- protected TransportConnector connector;
-
- protected HashMap<String, Integer> messageCount = new HashMap<String, Integer>();
- protected HashMap<String, Set<String>> messageGroups = new HashMap<String, Set<String>>();
-
- public static Test suite() {
- return suite(MessageGroupDelayedTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
- public void setUp() throws Exception {
- broker = createBroker();
- broker.start();
- ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri() + "?jms.prefetchPolicy.all=1");
- connection = connFactory.createConnection();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- destination = new ActiveMQQueue("test-queue2");
- producer = session.createProducer(destination);
- connection.start();
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService service = new BrokerService();
- service.setPersistent(false);
- service.setUseJmx(false);
-
- // Setup a destination policy where it takes only 1 message at a time.
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry policy = new PolicyEntry();
- log.info("testing with consumersBeforeDispatchStarts=" + consumersBeforeDispatchStarts + " and timeBeforeDispatchStarts=" + timeBeforeDispatchStarts);
- policy.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts);
- policy.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts);
- policyMap.setDefaultEntry(policy);
- service.setDestinationPolicy(policyMap);
-
- connector = service.addConnector("tcp://localhost:0");
- return service;
- }
-
- public void tearDown() throws Exception {
- producer.close();
- session.close();
- connection.close();
- broker.stop();
- }
-
-
-
- public void initCombosForTestDelayedDirectConnectionListener() {
- addCombinationValues("consumersBeforeDispatchStarts", new Object[] {0, 3, 5});
- addCombinationValues("timeBeforeDispatchStarts", new Object[] {0, 100});
- }
-
- public void testDelayedDirectConnectionListener() throws Exception {
-
- for(int i = 0; i < 10; i++) {
- Message msga = session.createTextMessage("hello a");
- msga.setStringProperty("JMSXGroupID", "A");
- producer.send(msga);
- Message msgb = session.createTextMessage("hello b");
- msgb.setStringProperty("JMSXGroupID", "B");
- producer.send(msgb);
- Message msgc = session.createTextMessage("hello c");
- msgc.setStringProperty("JMSXGroupID", "C");
- producer.send(msgc);
- }
- log.info("30 messages sent to group A/B/C");
-
- int[] counters = {10, 10, 10};
-
- CountDownLatch startSignal = new CountDownLatch(1);
- CountDownLatch doneSignal = new CountDownLatch(1);
-
- messageCount.put("worker1", 0);
- messageGroups.put("worker1", new HashSet<String>());
- Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups);
- messageCount.put("worker2", 0);
- messageGroups.put("worker2", new HashSet<String>());
- Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups);
- messageCount.put("worker3", 0);
- messageGroups.put("worker3", new HashSet<String>());
- Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal, counters, messageCount, messageGroups);
-
-
- new Thread(worker1).start();
- new Thread(worker2).start();
- new Thread(worker3).start();
-
- startSignal.countDown();
- doneSignal.await();
-
- // check results
- if (consumersBeforeDispatchStarts == 0 && timeBeforeDispatchStarts == 0) {
- log.info("Ignoring results because both parameters are 0");
- return;
- }
-
- for (String worker: messageCount.keySet()) {
- log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker));
- assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
- , 10, messageCount.get(worker).intValue());
- assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
- , 1, messageGroups.get(worker).size());
- }
-
- }
-
- private static final class Worker implements Runnable {
- private Connection connection = null;
- private Destination queueName = null;
- private String workerName = null;
- private CountDownLatch startSignal = null;
- private CountDownLatch doneSignal = null;
- private int[] counters = null;
- private HashMap<String, Integer> messageCount;
- private HashMap<String, Set<String>>messageGroups;
-
-
- private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[] counters, HashMap<String, Integer> messageCount, HashMap<String, Set<String>>messageGroups) {
- this.connection = connection;
- this.queueName = queueName;
- this.workerName = workerName;
- this.startSignal = startSignal;
- this.doneSignal = doneSignal;
- this.counters = counters;
- this.messageCount = messageCount;
- this.messageGroups = messageGroups;
- }
-
- private void update(String group) {
- int msgCount = messageCount.get(workerName);
- messageCount.put(workerName, msgCount + 1);
- Set<String> groups = messageGroups.get(workerName);
- groups.add(group);
- messageGroups.put(workerName, groups);
- }
-
- public void run() {
-
- try {
- log.info(workerName);
- startSignal.await();
- Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = sess.createConsumer(queueName);
-
- while(true) {
- if(counters[0] == 0 && counters[1] == 0 && counters[2] == 0 ) {
- doneSignal.countDown();
- log.info(workerName + " done...");
- break;
- }
-
- Message msg = consumer.receive(500);
- if(msg == null)
- continue;
-
- String group = msg.getStringProperty("JMSXGroupID");
- boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
-
- if("A".equals(group)){
- --counters[0];
- update(group);
- Thread.sleep(500);
- }
- else if("B".equals(group)) {
- --counters[1];
- update(group);
- Thread.sleep(100);
- }
- else if("C".equals(group)) {
- --counters[2];
- update(group);
- Thread.sleep(10);
- }
- else {
- log.warn("unknown group");
- }
- if (counters[0] != 0 || counters[1] != 0 || counters[2] != 0 ) {
- msg.acknowledge();
- }
- }
- consumer.close();
- sess.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch doneSignal = new CountDownLatch(1);
+
+ messageCount.put("worker1", 0);
+ messageGroups.put("worker1", new HashSet<String>());
+ Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal, counters, messageCount, messageGroups);
+ messageCount.put("worker2", 0);
+ messageGroups.put("worker2", new HashSet<String>());
+ Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal, counters, messageCount, messageGroups);
+ messageCount.put("worker3", 0);
+ messageGroups.put("worker3", new HashSet<String>());
+ Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal, counters, messageCount, messageGroups);
+
+ new Thread(worker1).start();
+ new Thread(worker2).start();
+ new Thread(worker3).start();
+
+ startSignal.countDown();
+ doneSignal.await();
+
+ // check results
+ if (consumersBeforeDispatchStarts == 0 && timeBeforeDispatchStarts == 0) {
+ log.info("Ignoring results because both parameters are 0");
+ return;
+ }
+
+ for (String worker : messageCount.keySet()) {
+ log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker));
+ assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker), 10, messageCount
+ .get(worker).intValue());
+ assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker), 1, messageGroups
+ .get(worker).size());
+ }
+
+ }
+
+ private static final class Worker implements Runnable {
+ private Connection connection = null;
+ private Destination queueName = null;
+ private String workerName = null;
+ private CountDownLatch startSignal = null;
+ private CountDownLatch doneSignal = null;
+ private int[] counters = null;
+ private final HashMap<String, Integer> messageCount;
+ private final HashMap<String, Set<String>> messageGroups;
+
+ private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[] counters,
+ HashMap<String, Integer> messageCount, HashMap<String, Set<String>> messageGroups) {
+ this.connection = connection;
+ this.queueName = queueName;
+ this.workerName = workerName;
+ this.startSignal = startSignal;
+ this.doneSignal = doneSignal;
+ this.counters = counters;
+ this.messageCount = messageCount;
+ this.messageGroups = messageGroups;
+ }
+
+ private void update(String group) {
+ int msgCount = messageCount.get(workerName);
+ messageCount.put(workerName, msgCount + 1);
+ Set<String> groups = messageGroups.get(workerName);
+ groups.add(group);
+ messageGroups.put(workerName, groups);
+ }
+
+ @Override
+ public void run() {
+
+ try {
+ log.info(workerName);
+ startSignal.await();
+ Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = sess.createConsumer(queueName);
+
+ while (true) {
+ if (counters[0] == 0 && counters[1] == 0 && counters[2] == 0) {
+ doneSignal.countDown();
+ log.info(workerName + " done...");
+ break;
+ }
+
+ Message msg = consumer.receive(500);
+ if (msg == null)
+ continue;
+
+ String group = msg.getStringProperty("JMSXGroupID");
+ msg.getBooleanProperty("JMSXGroupFirstForConsumer");
+
+ if ("A".equals(group)) {
+ --counters[0];
+ update(group);
+ Thread.sleep(500);
+ } else if ("B".equals(group)) {
+ --counters[1];
+ update(group);
+ Thread.sleep(100);
+ } else if ("C".equals(group)) {
+ --counters[2];
+ update(group);
+ Thread.sleep(10);
+ } else {
+ log.warn("unknown group");
+ }
+ if (counters[0] != 0 || counters[1] != 0 || counters[2] != 0) {
+ msg.acknowledge();
+ }
+ }
+ consumer.close();
+ sess.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}
- }
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MyObject.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MyObject.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MyObject.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MyObject.java Thu Feb 7 16:43:15 2013
@@ -17,19 +17,18 @@
package org.apache.activemq.usecases;
-import java.io.Serializable;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.IOException;
-import java.io.ObjectStreamException;
+import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
public class MyObject implements Serializable {
+ private static final long serialVersionUID = -2505777188753549398L;
+
private String message;
- private AtomicInteger writeObjectCalled = new AtomicInteger(0);
- private AtomicInteger readObjectCalled = new AtomicInteger(0);
- private AtomicInteger readObjectNoDataCalled = new AtomicInteger(0);
+ private final AtomicInteger writeObjectCalled = new AtomicInteger(0);
+ private final AtomicInteger readObjectCalled = new AtomicInteger(0);
+ private final AtomicInteger readObjectNoDataCalled = new AtomicInteger(0);
public MyObject(String message) {
this.setMessage(message);
@@ -50,11 +49,7 @@ public class MyObject implements Seriali
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
- readObjectCalled.incrementAndGet();
- }
-
- private void readObjectNoData() throws ObjectStreamException {
- readObjectNoDataCalled.incrementAndGet();
+ readObjectCalled.incrementAndGet();
}
public int getWriteObjectCalled() {
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NewConsumerCreatesDestinationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NewConsumerCreatesDestinationTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NewConsumerCreatesDestinationTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NewConsumerCreatesDestinationTest.java Thu Feb 7 16:43:15 2013
@@ -28,14 +28,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
- *
+ *
+ *
*/
public class NewConsumerCreatesDestinationTest extends EmbeddedBrokerAndConnectionTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(NewConsumerCreatesDestinationTest.class);
private ActiveMQQueue wildcard;
-
+
public void testNewConsumerCausesNewDestinationToBeAutoCreated() throws Exception {
// lets create a wildcard thats kinda like those used by Virtual Topics
@@ -44,10 +44,10 @@ public class NewConsumerCreatesDestinati
LOG.info("Using wildcard: " + wildcard);
LOG.info("on destination: " + destination);
-
+
assertDestinationCreated(destination, false);
assertDestinationCreated(wildcard, false);
-
+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(destination);
@@ -56,7 +56,7 @@ public class NewConsumerCreatesDestinati
}
protected void assertDestinationCreated(Destination destination, boolean expected) throws Exception {
- Set answer = broker.getBroker().getDestinations((ActiveMQDestination) destination);
+ Set<?> answer = broker.getBroker().getDestinations((ActiveMQDestination) destination);
int size = expected ? 1 : 0;
assertEquals("Size of found destinations: " + answer, size, answer.size());
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java Thu Feb 7 16:43:15 2013
@@ -16,6 +16,19 @@
*/
package org.apache.activemq.usecases;
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.Enumeration;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
@@ -26,13 +39,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.*;
-
-import java.net.URI;
-import java.util.Enumeration;
-
-import static org.junit.Assert.*;
-
public class QueueBrowsingTest {
private static final Logger LOG = LoggerFactory.getLogger(QueueBrowsingTest.class);
@@ -80,11 +86,12 @@ public class QueueBrowsingTest {
}
QueueBrowser browser = session.createBrowser(queue);
- Enumeration enumeration = browser.getEnumeration();
+ Enumeration<?> enumeration = browser.getEnumeration();
int received = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
received++;
+ LOG.info("Browsed message " + received + ": " + m.getJMSMessageID());
}
browser.close();
@@ -117,11 +124,12 @@ public class QueueBrowsingTest {
public void run() {
try {
QueueBrowser browser = session.createBrowser(queue);
- Enumeration enumeration = browser.getEnumeration();
+ Enumeration<?> enumeration = browser.getEnumeration();
int received = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
received++;
+ LOG.info("Browsed message " + received + ": " + m.getJMSMessageID());
}
assertEquals("Browsed all messages", messageToSend, received);
} catch (Exception e) {
@@ -139,7 +147,7 @@ public class QueueBrowsingTest {
MessageConsumer consumer = session.createConsumer(queue);
int received = 0;
while (true) {
- Message m = (Message) consumer.receive(1000);
+ Message m = consumer.receive(1000);
if (m == null)
break;
received++;
@@ -155,7 +163,6 @@ public class QueueBrowsingTest {
browserThread.join();
consumerThread.join();
-
}
@Test
@@ -180,7 +187,7 @@ public class QueueBrowsingTest {
}
QueueBrowser browser = session.createBrowser(queue);
- Enumeration enumeration = browser.getEnumeration();
+ Enumeration<?> enumeration = browser.getEnumeration();
int received = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
@@ -189,10 +196,6 @@ public class QueueBrowsingTest {
}
browser.close();
-
assertEquals(3, received);
}
-
-
-
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java?rev=1443600&r1=1443599&r2=1443600&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java Thu Feb 7 16:43:15 2013
@@ -27,10 +27,12 @@ import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
import java.util.Map;
import java.util.Vector;
+
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
@@ -56,7 +58,7 @@ public class RequestReplyNoAdvisoryNetwo
ActiveMQQueue sendQ = new ActiveMQQueue("sendQ");
static final String connectionIdMarker = "ID:marker.";
ActiveMQTempQueue replyQWildcard = new ActiveMQTempQueue(connectionIdMarker + ">");
- private long receiveTimeout = 30000;
+ private final long receiveTimeout = 30000;
public void testNonAdvisoryNetworkRequestReplyXmlConfig() throws Exception {
final String xmlConfigString = new String(
@@ -197,9 +199,9 @@ public class RequestReplyNoAdvisoryNetwo
assertTrue("all temps are gone on " + regionBroker.getBrokerName(), Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
- Map tempTopics = regionBroker.getTempTopicRegion().getDestinationMap();
+ Map<?,?> tempTopics = regionBroker.getTempTopicRegion().getDestinationMap();
LOG.info("temp topics on " + regionBroker.getBrokerName() + ", " + tempTopics);
- Map tempQ = regionBroker.getTempQueueRegion().getDestinationMap();
+ Map<?,?> tempQ = regionBroker.getTempQueueRegion().getDestinationMap();
LOG.info("temp queues on " + regionBroker.getBrokerName() + ", " + tempQ);
return tempQ.isEmpty() && tempTopics.isEmpty();
}
@@ -236,6 +238,7 @@ public class RequestReplyNoAdvisoryNetwo
}
}
+ @Override
public void tearDown() throws Exception {
for (BrokerService broker: brokers) {
broker.stop();