You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/03/25 11:17:49 UTC
svn commit: r758200 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
Author: gtully
Date: Wed Mar 25 10:17:45 2009
New Revision: 758200
URL: http://svn.apache.org/viewvc?rev=758200&view=rev
Log:
share the test
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=758200&r1=758199&r2=758200&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Wed Mar 25 10:17:45 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.bugs;
+import java.io.File;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
@@ -34,6 +35,9 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.usage.MemoryUsage;
@@ -59,24 +63,34 @@
final int MESSAGE_LENGTH_BYTES = 75000;
final int MAX_TO_SEND = 2000;
- final long SLEEP_BETWEEN_SEND_MS = 5;
+ final long SLEEP_BETWEEN_SEND_MS = 3;
final int NUM_SENDERS_AND_RECEIVERS = 10;
final Object brokerLock = new Object();
BrokerService broker;
Vector<Throwable> exceptions = new Vector<Throwable>();
+
+ private File dataDirFile;
public void createBroker(Configurer configurer) throws Exception {
broker = new BrokerService();
- broker.setDataDirectory("target/amq-data/" + getName());
+ AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+ persistenceFactory.setDataDirectory(dataDirFile);
+ broker.setPersistenceFactory(persistenceFactory);
+
broker.addConnector(BROKER_CONNECTOR);
+ broker.setBrokerName(getName());
+ broker.setDataDirectoryFile(dataDirFile);
if (configurer != null) {
configurer.configure(broker);
}
- broker.setBrokerName(getName());
broker.start();
}
+ public void setUp() throws Exception {
+ dataDirFile = new File("target/"+ getName());
+ }
+
public void tearDown() throws Exception {
synchronized(brokerLock) {
broker.stop();
@@ -138,7 +152,8 @@
+ " in msg: " + message.getJMSMessageID()
+ " expected "
+ nextExpectedSeqNum
- + ", lastId: " + lastId);
+ + ", lastId: " + lastId
+ + ", message:" + message);
fail(queueName + " received " + seqNum + " expected "
+ nextExpectedSeqNum);
}
@@ -189,10 +204,12 @@
LOG.error(queueName + " send error", e);
exceptions.add(e);
}
- try {
- Thread.sleep(SLEEP_BETWEEN_SEND_MS);
- } catch (InterruptedException e) {
- LOG.warn(queueName + " sleep interrupted", e);
+ if (SLEEP_BETWEEN_SEND_MS > 0) {
+ try {
+ Thread.sleep(SLEEP_BETWEEN_SEND_MS);
+ } catch (InterruptedException e) {
+ LOG.warn(queueName + " sleep interrupted", e);
+ }
}
}
try {
@@ -202,13 +219,13 @@
}
}
- public void testOrderWithMemeUsageLimit() throws Exception {
+ public void x_testOrderWithMemeUsageLimit() throws Exception {
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
SystemUsage usage = new SystemUsage();
MemoryUsage memoryUsage = new MemoryUsage();
- memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
+ memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 5 * NUM_SENDERS_AND_RECEIVERS);
usage.setMemoryUsage(memoryUsage);
broker.setSystemUsage(usage);
@@ -222,9 +239,9 @@
public void testOrderWithRestartVMIndex() throws Exception {
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
- AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+ AMQPersistenceAdapterFactory persistenceFactory =
+ (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
persistenceFactory.setPersistentIndex(false);
- broker.setPersistenceFactory(persistenceFactory);
broker.deleteAllMessages();
}
});
@@ -232,9 +249,9 @@
final Timer timer = new Timer();
schedualRestartTask(timer, new Configurer() {
public void configure(BrokerService broker) throws Exception {
- AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+ AMQPersistenceAdapterFactory persistenceFactory =
+ (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
persistenceFactory.setPersistentIndex(false);
- broker.setPersistenceFactory(persistenceFactory);
}
});
@@ -245,13 +262,31 @@
}
}
+
+ public void x_testOrderWithRestart() throws Exception {
+ createBroker(new Configurer() {
+ public void configure(BrokerService broker) throws Exception {
+ broker.deleteAllMessages();
+ }
+ });
+
+ final Timer timer = new Timer();
+ schedualRestartTask(timer, null);
+
+ try {
+ verifyOrderedMessageReceipt();
+ } finally {
+ timer.cancel();
+ }
+ }
+
public void x_testOrderWithRestartWithForceRecover() throws Exception {
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
- AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+ AMQPersistenceAdapterFactory persistenceFactory =
+ (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
persistenceFactory.setForceRecoverReferenceStore(true);
- broker.setPersistenceFactory(persistenceFactory);
broker.deleteAllMessages();
}
});
@@ -259,9 +294,14 @@
final Timer timer = new Timer();
schedualRestartTask(timer, new Configurer() {
public void configure(BrokerService broker) throws Exception {
- AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+ AMQPersistenceAdapterFactory persistenceFactory =
+ (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
persistenceFactory.setForceRecoverReferenceStore(true);
- broker.setPersistenceFactory(persistenceFactory);
+// PolicyEntry auditDepthPolicy = new PolicyEntry();
+// auditDepthPolicy.setMaxAuditDepth(2000);
+// PolicyMap policyMap = new PolicyMap();
+// policyMap.setDefaultEntry(auditDepthPolicy);
+// broker.setDestinationPolicy(policyMap);
}
});
@@ -272,12 +312,8 @@
}
}
- private void schedualRestartTask(Timer timer) {
- schedualRestartTask(timer, null);
- }
-
private void schedualRestartTask(final Timer timer, final Configurer configurer) {
- timer.schedule(new TimerTask() {
+ class RestartTask extends TimerTask {
public void run() {
synchronized (brokerLock) {
LOG.info("stopping broker..");
@@ -290,15 +326,20 @@
LOG.info("restarting broker");
try {
createBroker(configurer);
+ broker.waitUntilStarted();
} catch (Exception e) {
LOG.error("ex on broker restart", e);
exceptions.add(e);
}
}
- // do once
- // timer.cancel();
- }
- }, BROKER_STOP_PERIOD, BROKER_STOP_PERIOD);
+ // do it again
+ try {
+ timer.schedule(new RestartTask(), BROKER_STOP_PERIOD);
+ } catch (IllegalStateException ignore_alreadyCancelled) {
+ }
+ }
+ }
+ timer.schedule(new RestartTask(), BROKER_STOP_PERIOD);
}
private void verifyOrderedMessageReceipt() throws Exception {
@@ -314,17 +355,19 @@
threads.add(thread);
}
- final long expiry = System.currentTimeMillis() + 1000 * 60 * 5;
- while(!threads.isEmpty() && !receivers.isEmpty()
- && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
+ final long expiry = System.currentTimeMillis() + 1000 * 60 * 10;
+ while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
Thread sendThread = threads.firstElement();
sendThread.join(1000*10);
if (!sendThread.isAlive()) {
threads.remove(sendThread);
}
-
+ }
+ LOG.info("senders done...");
+
+ while(!receivers.isEmpty() && System.currentTimeMillis() < expiry) {
Receiver receiver = receivers.firstElement();
- if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND) {
+ if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND || !exceptions.isEmpty()) {
receiver.close();
receivers.remove(receiver);
}