You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/02/11 12:35:29 UTC
svn commit: r1069758 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
Author: gtully
Date: Fri Feb 11 11:35:29 2011
New Revision: 1069758
URL: http://svn.apache.org/viewvc?rev=1069758&view=rev
Log:
test that seems to show missing durable messages after restat, needs investigation
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java (with props)
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java?rev=1069758&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java Fri Feb 11 11:35:29 2011
@@ -0,0 +1,827 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+public class DurableSubProcessWithRestartTest {
+
+ public static final long RUNTIME = 5 * 60 * 1000;
+
+ public static final int SERVER_SLEEP = 2 * 1000; // max
+ public static final int CARGO_SIZE = 400; // max
+
+ public static final int MAX_CLIENTS = 5;
+ public static final Random CLIENT_LIFETIME = new Random(30 * 1000,
+ 2 * 60 * 1000);
+ public static final Random CLIENT_ONLINE = new Random(2 * 1000, 15 * 1000);
+ public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 20 * 1000);
+
+ public static final Persistence PERSISTENT_ADAPTER = Persistence.KAHADB;
+ public static final long BROKER_RESTART = 1 * 60 * 1000;
+
+ public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true;
+ public static final boolean CHECK_REDELIVERY = false;
+
+ private BrokerService broker;
+ private ActiveMQTopic topic;
+
+ private ClientManager clientManager;
+ private Server server;
+ private HouseKeeper houseKeeper;
+
+ private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock(
+ true);
+ private int restartCount = 0;
+
+ @Ignore("Needs some more investigation") @Test
+ public void testProcess() {
+ try {
+ server.start();
+ clientManager.start();
+
+ if (ALLOW_SUBSCRIPTION_ABANDONMENT)
+ houseKeeper.start();
+
+ if (BROKER_RESTART <= 0)
+ Thread.sleep(RUNTIME);
+ else {
+ long end = System.currentTimeMillis() + RUNTIME;
+
+ while (true) {
+ long now = System.currentTimeMillis();
+ if (now > end)
+ break;
+
+ now = end - now;
+ now = now < BROKER_RESTART ? now : BROKER_RESTART;
+ Thread.sleep(now);
+
+ restartBroker();
+ }
+ }
+ } catch (Throwable e) {
+ exit("ProcessTest.testProcess failed.", e);
+ }
+
+ processLock.writeLock().lock();
+ System.out.println("DONE.");
+ }
+
+ private void restartBroker() throws Exception {
+ System.out.println("Broker restart: waiting for components.");
+
+ processLock.writeLock().lock();
+ try {
+ destroyBroker();
+ startBroker(false);
+
+ restartCount++;
+ System.out.println("Broker restarted. count: " + restartCount);
+ } finally {
+ processLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Creates batch of messages in a transaction periodically. The last message
+ * in the transaction is always a special message what contains info about
+ * the whole transaction.
+ * <p>
+ * Notifies the clients about the created messages also.
+ */
+ final class Server extends Thread {
+
+ final String url = "vm://"
+ + getName()
+ + "?"
+ + "jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&"
+ + "jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&"
+ + "jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=false&"
+ + "jms.alwaysSyncSend=true&jms.dispatchAsync=false&"
+ + "jms.watchTopicAdvisories=false&"
+ + "waitForStart=200&create=false";
+ final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
+
+ final Object sendMutex = new Object();
+ final String[] cargos = new String[500];
+
+ int transRover = 0;
+ int messageRover = 0;
+
+ public Server() {
+ super("Server");
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ DurableSubProcessWithRestartTest.sleepRandom(SERVER_SLEEP);
+
+ processLock.readLock().lock();
+ try {
+ send();
+ } finally {
+ processLock.readLock().unlock();
+ }
+ }
+ } catch (Throwable e) {
+ exit("Server.run failed", e);
+ }
+ }
+
+ public void send() throws JMSException {
+ // do not create new clients now
+ // ToDo: Test this case later.
+ synchronized (sendMutex) {
+ int trans = ++transRover;
+ boolean relevantTrans = random(2) > 1;
+ ClientType clientType = relevantTrans ? ClientType
+ .randomClientType() : null; // sends this types
+ int count = random(200);
+
+ System.out.println("Sending Trans[id=" + trans + ", count="
+ + count + ", clientType=" + clientType + "]");
+
+ Connection con = cf.createConnection();
+ Session sess = con
+ .createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sess.createProducer(null);
+
+ for (int i = 0; i < count; i++) {
+ Message message = sess.createMessage();
+ message.setIntProperty("ID", ++messageRover);
+ message.setIntProperty("TRANS", trans);
+ String type = clientType != null ? clientType
+ .randomMessageType() : ClientType
+ .randomNonRelevantMessageType();
+ message.setStringProperty("TYPE", type);
+
+ if (CARGO_SIZE > 0)
+ message.setStringProperty("CARGO",
+ getCargo(random(CARGO_SIZE)));
+
+ prod.send(topic, message);
+ clientManager.onServerMessage(message);
+ }
+
+ Message message = sess.createMessage();
+ message.setIntProperty("ID", ++messageRover);
+ message.setIntProperty("TRANS", trans);
+ message.setBooleanProperty("COMMIT", true);
+ message.setBooleanProperty("RELEVANT", relevantTrans);
+ prod.send(topic, message);
+ clientManager.onServerMessage(message);
+
+ sess.commit();
+ System.out.println("Committed Trans[id=" + trans + ", count="
+ + count + ", clientType=" + clientType + "], ID=" + messageRover);
+
+ sess.close();
+ con.close();
+ }
+ }
+
+ private String getCargo(int length) {
+ if (length == 0)
+ return null;
+
+ if (length < cargos.length) {
+ String result = cargos[length];
+ if (result == null) {
+ result = getCargoImpl(length);
+ cargos[length] = result;
+ }
+ return result;
+ }
+ return getCargoImpl(length);
+ }
+
+ private String getCargoImpl(int length) {
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = length; --i >= 0;) {
+ sb.append('a');
+ }
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Clients listen on different messages in the topic. The 'TYPE' property
+ * helps the client to select the proper messages.
+ */
+ private enum ClientType {
+ A("a", "b", "c"), B("c", "d", "e"), C("d", "e", "f"), D("g", "h");
+
+ public final String[] messageTypes;
+ public final HashSet<String> messageTypeSet;
+ public final String selector;
+
+ ClientType(String... messageTypes) {
+ this.messageTypes = messageTypes;
+ messageTypeSet = new HashSet<String>(Arrays.asList(messageTypes));
+
+ StringBuilder sb = new StringBuilder("TYPE in (");
+ for (int i = 0; i < messageTypes.length; i++) {
+ if (i > 0)
+ sb.append(", ");
+ sb.append('\'').append(messageTypes[i]).append('\'');
+ }
+ sb.append(')');
+ selector = sb.toString();
+ }
+
+ public static ClientType randomClientType() {
+ return values()[DurableSubProcessWithRestartTest
+ .random(values().length - 1)];
+ }
+
+ public final String randomMessageType() {
+ return messageTypes[DurableSubProcessWithRestartTest
+ .random(messageTypes.length - 1)];
+ }
+
+ public static String randomNonRelevantMessageType() {
+ return Integer
+ .toString(DurableSubProcessWithRestartTest.random(20));
+ }
+
+ public final boolean isRelevant(String messageType) {
+ return messageTypeSet.contains(messageType);
+ }
+
+ @Override
+ public final String toString() {
+ return this.name() /* + '[' + selector + ']' */;
+ }
+ }
+
+ /**
+ * Creates new cliens.
+ */
+ private final class ClientManager extends Thread {
+
+ private int clientRover = 0;
+
+ private final CopyOnWriteArrayList<Client> clients = new CopyOnWriteArrayList<Client>();
+
+ public ClientManager() {
+ super("ClientManager");
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ if (clients.size() < MAX_CLIENTS) {
+ processLock.readLock().lock();
+ try {
+ createNewClient();
+ } finally {
+ processLock.readLock().unlock();
+ }
+ }
+
+ int size = clients.size();
+ sleepRandom(size * 3 * 1000, size * 6 * 1000);
+ }
+ } catch (Throwable e) {
+ exit("ClientManager.run failed.", e);
+ }
+ }
+
+ private void createNewClient() throws JMSException {
+ ClientType type = ClientType.randomClientType();
+
+ Client client;
+ synchronized (server.sendMutex) {
+ client = new Client(++clientRover, type, CLIENT_LIFETIME,
+ CLIENT_ONLINE, CLIENT_OFFLINE);
+ clients.add(client);
+ }
+ client.start();
+
+ System.out.println(client.toString() + " created. " + this);
+ }
+
+ public void removeClient(Client client) {
+ clients.remove(client);
+ }
+
+ public void onServerMessage(Message message) throws JMSException {
+ for (Client client : clients) {
+ client.onServerMessage(message);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ClientManager[count=");
+ sb.append(clients.size());
+ sb.append(", clients=");
+ boolean sep = false;
+ for (Client client : clients) {
+ if (sep)
+ sb.append(", ");
+ else
+ sep = true;
+ sb.append(client.toString());
+ }
+ sb.append(']');
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Consumes massages from a durable subscription. Goes online/offline
+ * periodically. Checks the incoming messages against the sent messages of
+ * the server.
+ */
+ private final class Client extends Thread {
+
+ String url = "failover:(tcp://localhost:61656?wireFormat.maxInactivityDuration=0)?"
+ + "jms.watchTopicAdvisories=false&"
+ + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&"
+ + "jms.producerWindowSize=20971520&"
+ + "jms.copyMessageOnSend=false&"
+ + "initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&"
+ + "useExponentialBackOff=true";
+ final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
+
+ public static final String SUBSCRIPTION_NAME = "subscription";
+
+ private final int id;
+ private final String conClientId;
+
+ private final Random lifetime;
+ private final Random online;
+ private final Random offline;
+
+ private final ClientType clientType;
+ private final String selector;
+
+ private final ConcurrentLinkedQueue<Message> waitingList = new ConcurrentLinkedQueue<Message>();
+ private final HashSet<Integer> processed = CHECK_REDELIVERY ? new HashSet<Integer>(
+ 10000) : null;
+
+ public Client(int id, ClientType clientType, Random lifetime,
+ Random online, Random offline) throws JMSException {
+ super("Client" + id);
+ setDaemon(true);
+
+ this.id = id;
+ conClientId = "cli" + id;
+ this.clientType = clientType;
+ selector = "(COMMIT = true and RELEVANT = true) or "
+ + clientType.selector;
+
+ this.lifetime = lifetime;
+ this.online = online;
+ this.offline = offline;
+
+ subscribe();
+ }
+
+ @Override
+ public void run() {
+ long end = System.currentTimeMillis() + lifetime.next();
+ try {
+ boolean sleep = false;
+ while (true) {
+ long max = end - System.currentTimeMillis();
+ if (max <= 0)
+ break;
+
+ if (sleep)
+ offline.sleepRandom();
+ else
+ sleep = true;
+
+ processLock.readLock().lock();
+ try {
+ process(online.next());
+ } finally {
+ processLock.readLock().unlock();
+ }
+ }
+
+ if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0)
+ unsubscribe();
+ else {
+ System.out.println("Client abandon the subscription. "
+ + this);
+
+ // housekeeper should sweep these abandoned subscriptions
+ houseKeeper.abandonedSubscriptions.add(conClientId);
+ }
+ } catch (Throwable e) {
+ exit(toString() + " failed.", e);
+ }
+
+ clientManager.removeClient(this);
+ System.out.println(toString() + " DONE.");
+ }
+
+ private void process(long millis) throws JMSException {
+ long end = System.currentTimeMillis() + millis;
+ long hardEnd = end + 20000; // wait to finish the transaction.
+ boolean inTransaction = false;
+ int transCount = 0;
+
+ System.out.println(toString() + " ONLINE.");
+ Connection con = openConnection();
+ Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = sess.createDurableSubscriber(topic,
+ SUBSCRIPTION_NAME, selector, false);
+ try {
+ do {
+ long max = end - System.currentTimeMillis();
+ if (max <= 0) {
+ if (!inTransaction)
+ break;
+
+ max = hardEnd - System.currentTimeMillis();
+ if (max <= 0)
+ exit("" + this
+ + " failed: Transaction is not finished.");
+ }
+
+ Message message = consumer.receive(max);
+ if (message == null)
+ continue;
+
+ onClientMessage(message);
+
+ if (message.propertyExists("COMMIT")) {
+ message.acknowledge(); // CLIENT_ACKNOWLEDGE
+
+ System.out.println("Received Trans[id="
+ + message.getIntProperty("TRANS") + ", count="
+ + transCount + "] in " + this + ".");
+
+ inTransaction = false;
+ transCount = 0;
+ } else {
+ inTransaction = true;
+ transCount++;
+ }
+ } while (true);
+ } finally {
+ sess.close();
+ con.close();
+
+ System.out.println(toString() + " OFFLINE.");
+
+ // Check if the messages are in the waiting
+ // list for long time.
+ Message topMessage = waitingList.peek();
+ if (topMessage != null)
+ checkDeliveryTime(topMessage);
+ }
+ }
+
+ public void onServerMessage(Message message) throws JMSException {
+ if (Boolean.TRUE.equals(message.getObjectProperty("COMMIT"))) {
+ if (Boolean.TRUE.equals(message.getObjectProperty("RELEVANT")))
+ waitingList.add(message);
+ } else {
+ String messageType = message.getStringProperty("TYPE");
+ if (clientType.isRelevant(messageType))
+ waitingList.add(message);
+ }
+ }
+
+ public void onClientMessage(Message message) {
+ Message serverMessage = waitingList.poll();
+ try {
+ Integer receivedId = (Integer) message.getObjectProperty("ID");
+ if (processed != null && processed.contains(receivedId))
+ System.out.println("! Message has been processed before. "
+ + this + " message = " + message);
+
+ if (serverMessage == null)
+ exit(""
+ + this
+ + " failed: There is no next server message, but received: "
+ + message);
+
+ Integer serverId = (Integer) serverMessage
+ .getObjectProperty("ID");
+ if (receivedId == null || serverId == null)
+ exit("" + this + " failed: message ID not found.\r\n"
+ + " received: " + message + "\r\n" + " server: "
+ + serverMessage);
+
+ if (!serverId.equals(receivedId))
+ exit("" + this + " failed: Received wrong message.\r\n"
+ + " received: " + message + "\r\n" + " server: "
+ + serverMessage);
+
+ checkDeliveryTime(message);
+
+ if (processed != null)
+ processed.add(receivedId);
+ } catch (Throwable e) {
+ exit("" + this + ".onClientMessage failed.\r\n" + " received: "
+ + message + "\r\n" + " server: " + serverMessage, e);
+ }
+ }
+
+ /**
+ * Checks if the message was not delivered fast enough.
+ */
+ public void checkDeliveryTime(Message message) throws JMSException {
+ long creation = message.getJMSTimestamp();
+ long min = System.currentTimeMillis() - (offline.max + online.min)
+ * (BROKER_RESTART > 0 ? 4 : 1);
+
+ if (false && min > creation) {
+ SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
+ exit("" + this + ".checkDeliveryTime failed. Message time: "
+ + df.format(new Date(creation)) + ", min: "
+ + df.format(new Date(min)) + "\r\n" + message);
+ }
+ }
+
+ private Connection openConnection() throws JMSException {
+ Connection con = cf.createConnection();
+ con.setClientID(conClientId);
+ con.start();
+ return con;
+ }
+
+ private void subscribe() throws JMSException {
+ Connection con = openConnection();
+ Session session = con
+ .createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector,
+ true);
+ session.close();
+ con.close();
+ }
+
+ private void unsubscribe() throws JMSException {
+ Connection con = openConnection();
+ Session session = con
+ .createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.unsubscribe(SUBSCRIPTION_NAME);
+ session.close();
+ con.close();
+ }
+
+ @Override
+ public String toString() {
+ return "Client[id=" + id + ", type=" + clientType + "]";
+ }
+ }
+
+ /**
+ * Sweeps out not-used durable subscriptions.
+ */
+ private final class HouseKeeper extends Thread {
+
+ private HouseKeeper() {
+ super("HouseKeeper");
+ setDaemon(true);
+ }
+
+ public final CopyOnWriteArrayList<String> abandonedSubscriptions = new CopyOnWriteArrayList<String>();
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(3 * 60 * 1000);
+
+ processLock.readLock().lock();
+ try {
+ sweep();
+ } finally {
+ processLock.readLock().unlock();
+ }
+ } catch (InterruptedException ex) {
+ break;
+ } catch (Throwable e) {
+ Exception log = new Exception("HouseKeeper failed.", e);
+ log.printStackTrace();
+ }
+ }
+ }
+
+ private void sweep() throws Exception {
+ System.out.println("Housekeeper sweeping.");
+
+ int closed = 0;
+ ArrayList<String> sweeped = new ArrayList<String>();
+ try {
+ for (String clientId : abandonedSubscriptions) {
+ System.out.println("Sweeping out subscription of "
+ + clientId + ".");
+ broker.getAdminView().destroyDurableSubscriber(clientId,
+ Client.SUBSCRIPTION_NAME);
+ sweeped.add(clientId);
+ closed++;
+ }
+ } catch (Exception ignored) {
+ System.out.println("Ex on destroy sub " + ignored);
+ } finally {
+ abandonedSubscriptions.removeAll(sweeped);
+ }
+
+ System.out.println("Housekeeper sweeped out " + closed
+ + " subscriptions.");
+ }
+ }
+
+ public static int random(int max) {
+ return (int) (Math.random() * (max + 1));
+ }
+
+ public static int random(int min, int max) {
+ return random(max - min) + min;
+ }
+
+ public static void sleepRandom(int maxMillis) throws InterruptedException {
+ Thread.sleep(random(maxMillis));
+ }
+
+ public static void sleepRandom(int minMillis, int maxMillis)
+ throws InterruptedException {
+ Thread.sleep(random(minMillis, maxMillis));
+ }
+
+ public static final class Random {
+
+ final int min;
+ final int max;
+
+ Random(int min, int max) {
+ this.min = min;
+ this.max = max;
+ }
+
+ public int next() {
+ return random(min, max);
+ }
+
+ public void sleepRandom() throws InterruptedException {
+ DurableSubProcessWithRestartTest.sleepRandom(min, max);
+ }
+ }
+
+ public static void exit(String message) {
+ exit(message, null);
+ }
+
+ public static void exit(String message, Throwable e) {
+ Throwable log = new RuntimeException(message, e);
+ log.printStackTrace();
+ System.exit(0);
+ }
+
+ protected void setUp() throws Exception {
+ topic = new ActiveMQTopic("TopicT");
+ startBroker();
+
+ clientManager = new ClientManager();
+ server = new Server();
+ houseKeeper = new HouseKeeper();
+
+ }
+
+ protected void tearDown() throws Exception {
+ destroyBroker();
+ }
+
+ private enum Persistence {
+ MEMORY, AMQ, KAHA, KAHADB
+ }
+
+ private void startBroker() throws Exception {
+ startBroker(true);
+ }
+
+ private void startBroker(boolean deleteAllMessages) throws Exception {
+ if (broker != null)
+ return;
+
+ broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
+ broker.setBrokerName(getName());
+ broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+
+ switch (PERSISTENT_ADAPTER) {
+ case MEMORY:
+ broker.setPersistent(false);
+ break;
+
+ case AMQ:
+ File amqData = new File("activemq-data/" + getName() + "-amq");
+ if (deleteAllMessages)
+ delete(amqData);
+
+ broker.setPersistent(true);
+ AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
+ amq.setDirectory(amqData);
+ broker.setPersistenceAdapter(amq);
+ break;
+
+ case KAHA:
+ File kahaData = new File("activemq-data/" + getName() + "-kaha");
+ if (deleteAllMessages)
+ delete(kahaData);
+
+ broker.setPersistent(true);
+ KahaPersistenceAdapter kaha = new KahaPersistenceAdapter();
+ kaha.setDirectory(kahaData);
+ broker.setPersistenceAdapter(kaha);
+ break;
+
+ case KAHADB:
+ File kahadbData = new File("activemq-data/" + getName() + "-kahadb");
+ if (deleteAllMessages)
+ delete(kahadbData);
+
+ broker.setPersistent(true);
+ KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+ kahadb.setDirectory(kahadbData);
+ kahadb.setJournalMaxFileLength(5 * 1024 * 1024);
+ broker.setPersistenceAdapter(kahadb);
+ break;
+ }
+
+ broker.addConnector("tcp://localhost:61656");
+
+ broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
+ broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024);
+ broker.getSystemUsage().getStoreUsage().setLimit(256 * 1024 * 1024);
+
+ broker.start();
+ }
+
+ private String getName() {
+ return DurableSubProcessWithRestartTest.class.getName();
+ }
+
+ private static boolean delete(File path) {
+ if (path == null)
+ return true;
+
+ if (path.isDirectory()) {
+ for (File file : path.listFiles()) {
+ delete(file);
+ }
+ }
+ return path.delete();
+ }
+
+ private void destroyBroker() throws Exception {
+ if (broker == null)
+ return;
+
+ broker.stop();
+ broker = null;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessWithRestartTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date