You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/03/20 15:06:46 UTC
svn commit: r756469 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
Author: gtully
Date: Fri Mar 20 14:06:45 2009
New Revision: 756469
URL: http://svn.apache.org/viewvc?rev=756469&view=rev
Log:
further tests for AMQ-2149
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=756469&r1=756468&r2=756469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Fri Mar 20 14:06:45 2009
@@ -17,6 +17,8 @@
package org.apache.activemq.bugs;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.Vector;
import junit.framework.TestCase;
@@ -33,45 +35,54 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+interface Configurer {
+ public void configure(BrokerService broker) throws Exception;
+}
+
public class AMQ2149Test extends TestCase {
- private static final Log log = LogFactory.getLog(AMQ2149Test.class);
+ private static final Log LOG = LogFactory.getLog(AMQ2149Test.class);
+
+ private static final long BROKER_STOP_PERIOD = 15 * 1000;
- private String BROKER_URL;
+ private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
+ private static final String BROKER_URL = "failover:("+ BROKER_CONNECTOR
+ +")?maxReconnectDelay=1000&useExponentialBackOff=false";
+
private final String SEQ_NUM_PROPERTY = "seqNum";
final int MESSAGE_LENGTH_BYTES = 75000;
final int MAX_TO_SEND = 2000;
final long SLEEP_BETWEEN_SEND_MS = 5;
final int NUM_SENDERS_AND_RECEIVERS = 10;
+ final Object brokerLock = new Object();
BrokerService broker;
Vector<Throwable> exceptions = new Vector<Throwable>();
- public void setUp() throws Exception {
+ public void createBroker(Configurer configurer) throws Exception {
broker = new BrokerService();
- broker.addConnector("tcp://localhost:0");
- broker.deleteAllMessages();
-
- SystemUsage usage = new SystemUsage();
- MemoryUsage memoryUsage = new MemoryUsage();
- memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
- usage.setMemoryUsage(memoryUsage);
- broker.setSystemUsage(usage);
+ broker.setDataDirectory("target/amq-data/" + getName());
+ broker.addConnector(BROKER_CONNECTOR);
+ if (configurer != null) {
+ configurer.configure(broker);
+ }
+ broker.setBrokerName(getName());
broker.start();
-
- BROKER_URL = "failover:("
- + broker.getTransportConnectors().get(0).getUri()
- +")?maxReconnectDelay=1000&useExponentialBackOff=false";
}
public void tearDown() throws Exception {
- broker.stop();
+ synchronized(brokerLock) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ exceptions.clear();
}
private String buildLongString() {
@@ -94,6 +105,8 @@
private final MessageConsumer messageConsumer;
private volatile long nextExpectedSeqNum = 0;
+
+ private String lastId = null;
public Receiver(String queueName) throws JMSException {
this.queueName = queueName;
@@ -106,21 +119,33 @@
connection.start();
}
+ public void close() throws JMSException {
+ connection.close();
+ }
+
+ public long getNextExpectedSeqNo() {
+ return nextExpectedSeqNum;
+ }
+
public void onMessage(Message message) {
try {
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
if ((seqNum % 100) == 0) {
- log.info(queueName + " received " + seqNum);
+ LOG.info(queueName + " received " + seqNum);
}
if (seqNum != nextExpectedSeqNum) {
- log.warn(queueName + " received " + seqNum + " expected "
- + nextExpectedSeqNum);
+ LOG.warn(queueName + " received " + seqNum
+ + " in msg: " + message.getJMSMessageID()
+ + " expected "
+ + nextExpectedSeqNum
+ + ", lastId: " + lastId);
fail(queueName + " received " + seqNum + " expected "
+ nextExpectedSeqNum);
}
++nextExpectedSeqNum;
+ lastId = message.getJMSMessageID();
} catch (Throwable e) {
- log.error(queueName + " onMessage error", e);
+ LOG.error(queueName + " onMessage error", e);
exceptions.add(e);
}
}
@@ -161,38 +186,150 @@
++nextSequenceNumber;
messageProducer.send(message);
} catch (Exception e) {
- log.error(queueName + " send error", e);
+ LOG.error(queueName + " send error", e);
exceptions.add(e);
}
try {
Thread.sleep(SLEEP_BETWEEN_SEND_MS);
} catch (InterruptedException e) {
- log.warn(queueName + " sleep interrupted", e);
+ LOG.warn(queueName + " sleep interrupted", e);
}
}
+ try {
+ connection.close();
+ } catch (JMSException ignored) {
+ }
}
}
- public void testOutOfOrderWithMemeUsageLimit() throws Exception {
+ public void testOrderWithMemeUsageLimit() throws Exception {
+
+ createBroker(new Configurer() {
+ public void configure(BrokerService broker) throws Exception {
+ SystemUsage usage = new SystemUsage();
+ MemoryUsage memoryUsage = new MemoryUsage();
+ memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
+ usage.setMemoryUsage(memoryUsage);
+ broker.setSystemUsage(usage);
+
+ broker.deleteAllMessages();
+ }
+ });
+
+ verifyOrderedMessageReceipt();
+ }
+
+ public void testOrderWithRestartVMIndex() throws Exception {
+ createBroker(new Configurer() {
+ public void configure(BrokerService broker) throws Exception {
+ AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+ persistenceFactory.setPersistentIndex(false);
+ broker.setPersistenceFactory(persistenceFactory);
+ broker.deleteAllMessages();
+ }
+ });
+
+ final Timer timer = new Timer();
+ schedualRestartTask(timer, new Configurer() {
+ public void configure(BrokerService broker) throws Exception {
+ AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+ persistenceFactory.setPersistentIndex(false);
+ broker.setPersistenceFactory(persistenceFactory);
+ }
+ });
+
+ try {
+ verifyOrderedMessageReceipt();
+ } finally {
+ timer.cancel();
+ }
+ }
+
+
+ public void x_testOrderWithRestartWithForceRecover() throws Exception {
+ createBroker(new Configurer() {
+ public void configure(BrokerService broker) throws Exception {
+ AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+ persistenceFactory.setForceRecoverReferenceStore(true);
+ broker.setPersistenceFactory(persistenceFactory);
+ broker.deleteAllMessages();
+ }
+ });
+
+ final Timer timer = new Timer();
+ schedualRestartTask(timer, new Configurer() {
+ public void configure(BrokerService broker) throws Exception {
+ AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+ persistenceFactory.setForceRecoverReferenceStore(true);
+ broker.setPersistenceFactory(persistenceFactory);
+ }
+ });
+
+ try {
+ verifyOrderedMessageReceipt();
+ } finally {
+ timer.cancel();
+ }
+ }
+
+ private void schedualRestartTask(Timer timer) {
+ schedualRestartTask(timer, null);
+ }
+
+ private void schedualRestartTask(final Timer timer, final Configurer configurer) {
+ timer.schedule(new TimerTask() {
+ public void run() {
+ synchronized (brokerLock) {
+ LOG.info("stopping broker..");
+ try {
+ broker.stop();
+ } catch (Exception e) {
+ LOG.error("ex on broker stop", e);
+ exceptions.add(e);
+ }
+ LOG.info("restarting broker");
+ try {
+ createBroker(configurer);
+ } catch (Exception e) {
+ LOG.error("ex on broker restart", e);
+ exceptions.add(e);
+ }
+ }
+ // do once
+ // timer.cancel();
+ }
+ }, BROKER_STOP_PERIOD, BROKER_STOP_PERIOD);
+ }
+
+ private void verifyOrderedMessageReceipt() throws Exception {
+
Vector<Thread> threads = new Vector<Thread>();
+ Vector<Receiver> receivers = new Vector<Receiver>();
for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) {
final String queueName = "test.queue." + i;
- new Receiver(queueName);
+ receivers.add(new Receiver(queueName));
Thread thread = new Thread(new Sender(queueName));
thread.start();
threads.add(thread);
}
final long expiry = System.currentTimeMillis() + 1000 * 60 * 5;
- while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
+ while(!threads.isEmpty() && !receivers.isEmpty()
+ && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
Thread sendThread = threads.firstElement();
sendThread.join(1000*10);
if (!sendThread.isAlive()) {
threads.remove(sendThread);
}
+
+ Receiver receiver = receivers.firstElement();
+ if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND) {
+ receiver.close();
+ receivers.remove(receiver);
+ }
}
- assertTrue("No timeout waiting for senders to complete", System.currentTimeMillis() < expiry);
+ assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < expiry);
assertTrue("No exceptions", exceptions.isEmpty());
}