You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/04/23 15:17:33 UTC
svn commit: r1329225 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubDelayedUnsubscribeTest.java
Author: tabish
Date: Mon Apr 23 13:17:33 2012
New Revision: 1329225
URL: http://svn.apache.org/viewvc?rev=1329225&view=rev
Log:
Additional test for: https://issues.apache.org/jira/browse/AMQ-3775
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubDelayedUnsubscribeTest.java (with props)
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubDelayedUnsubscribeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubDelayedUnsubscribeTest.java?rev=1329225&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubDelayedUnsubscribeTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubDelayedUnsubscribeTest.java Mon Apr 23 13:17:33 2012
@@ -0,0 +1,745 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Vector;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * A cut down version of DurableSubProcessWithRestartTest that focuses on kahaDB file retention
+ */
+public class DurableSubDelayedUnsubscribeTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubDelayedUnsubscribeTest.class);
+
+ private static final long RUNTIME = 2 * 60 * 1000;
+ private static final int CARGO_SIZE = 400; // max
+ private static final int MAX_CLIENTS = 15;
+ private static boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true;
+ private static final Vector<Throwable> exceptions = new Vector<Throwable>();
+
+ private BrokerService broker;
+ private ActiveMQTopic topic;
+
+ private ClientManager clientManager;
+ private Server server;
+ private HouseKeeper houseKeeper;
+
+ private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock(true);
+
+ @Test
+ public void testProcess() throws Exception {
+
+ server.start();
+ clientManager.start();
+
+ houseKeeper.start();
+
+ // Sleep to
+ Thread.sleep(RUNTIME);
+
+ // inform message producer to stop
+ server.stopped = true;
+
+ // add one Subscriber to the topic that will not be unsubscribed.
+ // should not have any pending messages in the kahadb store
+ Client lastClient = new Client(32000, ClientType.A);
+ lastClient.process(1000);
+
+ // stop client manager from creating any more clients
+ clientManager.stopped = true;
+
+ final BrokerService brokerService = this.broker;
+
+ // Wait for all client subscription to be unsubscribed or swept away.
+
+ assertTrue("should have only one inactiveSubscriber subscribed but was: " + brokerService.getAdminView().getInactiveDurableTopicSubscribers().length,
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
+ }
+ }, TimeUnit.MINUTES.toMillis(3)));
+
+ assertTrue("should be no subscribers subscribed but was: " + brokerService.getAdminView().getDurableTopicSubscribers().length,
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerService.getAdminView().getDurableTopicSubscribers().length == 0;
+ }
+ }, TimeUnit.MINUTES.toMillis(3)));
+
+ processLock.writeLock().lock();
+
+ // check outcome.
+
+ ObjectName[] subscribers = broker.getAdminView().getDurableTopicSubscribers();
+ ObjectName[] inactiveSubscribers = broker.getAdminView().getInactiveDurableTopicSubscribers();
+ final KahaDBPersistenceAdapter persistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+
+ printDebugClientInfo(subscribers, inactiveSubscribers, persistenceAdapter);
+
+ assertEquals("should have only one inactiveSubscriber subscribed", 1, broker.getAdminView().getInactiveDurableTopicSubscribers().length);
+ assertEquals("should be no subscribers subscribed", 0, broker.getAdminView().getDurableTopicSubscribers().length);
+
+ final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+ assertTrue("should be less than 3 journal file left but was: " + persistenceAdapter.getStore().getJournal().getFileMap().size(),
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return pa.getStore().getJournal().getFileMap().size() <= 3;
+ }
+ }, TimeUnit.MINUTES.toMillis(3)));
+
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+
+ LOG.info("DONE.");
+ }
+
+ private void printDebugClientInfo(ObjectName[] subscribers, ObjectName[] inactiveSubscribers, final KahaDBPersistenceAdapter pa) throws IOException {
+
+ LOG.info("====>>> START DEBUG Subscriber INFO");
+
+ LOG.info("Number of subscribers subscribed as seen through JMX is" + subscribers.length);
+
+ for (int i = 0; i < subscribers.length; i++) {
+ LOG.info("subscribers subscribed as seen throngh JMX: " + subscribers[i]);
+ }
+
+ LOG.info("Number of inactiveSubscribers subscribed as seen through JMX is" + inactiveSubscribers.length);
+
+ for (int i = 0; i < inactiveSubscribers.length; i++) {
+ LOG.info("subscribers subscribed as seen throngh JMX: " + inactiveSubscribers[i]);
+ }
+
+ LOG.info("ClientManager.clients size is" + clientManager.clients.size());
+
+ for (int i = 0; i < clientManager.clients.size(); i++) {
+ LOG.info("clients is: " + clientManager.clients.get(i));
+ }
+
+ LOG.info("housekeep.subscriptions size is " + houseKeeper.abandonedSubscriptions.size());
+
+ for (int i = 0; i < houseKeeper.abandonedSubscriptions.size(); i++) {
+ LOG.info("housekeep is: " + houseKeeper.abandonedSubscriptions.get(i));
+ }
+
+ LOG.info("number of journal files left" + pa.getStore().getJournal().getFileMap().size());
+
+ LOG.info("====>>> END DEBUG Subscriber INFO");
+ }
+
+ /**
+ * Creates batch of messages in a transaction periodically. The last message
+ * in the transaction is always a special message what contains info about
+ * the whole transaction.
+ * <p>
+ * Notifies the clients about the created messages also.
+ */
+ final class Server extends Thread {
+
+ public boolean stopped;
+ final String url = "vm://" + DurableSubDelayedUnsubscribeTest.getName() + "?"
+ + "jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&"
+ + "jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&" + "jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=false&"
+ + "jms.alwaysSyncSend=true&jms.dispatchAsync=false&" + "jms.watchTopicAdvisories=false&" + "waitForStart=200&create=false";
+ final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
+
+ final Object sendMutex = new Object();
+ final String[] cargos = new String[500];
+
+ int transRover = 0;
+ int messageRover = 0;
+
+ public Server() {
+ super("Server");
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+
+ if (stopped) {
+ // server should stop producing
+ break;
+ }
+
+ Thread.sleep(500);
+ processLock.readLock().lock();
+ try {
+ send();
+ } finally {
+ processLock.readLock().unlock();
+ }
+ }
+ } catch (Throwable e) {
+ exit("Server.run failed", e);
+ }
+ }
+
+ public void send() throws JMSException {
+ // do not create new clients now
+ // ToDo: Test this case later.
+ synchronized (sendMutex) {
+ int trans = ++transRover;
+ boolean relevantTrans = random(2) > 1;
+ ClientType clientType = relevantTrans ? ClientType.randomClientType() : null; // sends
+ // this
+ // types
+ int count = random(200);
+
+ LOG.info("Sending Trans[id=" + trans + ", count=" + count + ", clientType=" + clientType + "]");
+
+ Connection con = cf.createConnection();
+ Session sess = con.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer prod = sess.createProducer(null);
+
+ for (int i = 0; i < count; i++) {
+ Message message = sess.createMessage();
+ message.setIntProperty("ID", ++messageRover);
+ message.setIntProperty("TRANS", trans);
+ String type = clientType != null ? clientType.randomMessageType() : ClientType.randomNonRelevantMessageType();
+ message.setStringProperty("TYPE", type);
+
+ if (CARGO_SIZE > 0)
+ message.setStringProperty("CARGO", getCargo(random(CARGO_SIZE)));
+
+ prod.send(topic, message);
+
+ }
+
+ Message message = sess.createMessage();
+ message.setIntProperty("ID", ++messageRover);
+ message.setIntProperty("TRANS", trans);
+ message.setBooleanProperty("COMMIT", true);
+ message.setBooleanProperty("RELEVANT", relevantTrans);
+ prod.send(topic, message);
+
+ sess.commit();
+ LOG.info("Committed Trans[id=" + trans + ", count=" + count + ", clientType=" + clientType + "], ID=" + messageRover);
+
+ sess.close();
+ con.close();
+ }
+ }
+
+ private String getCargo(int length) {
+ if (length == 0)
+ return null;
+
+ if (length < cargos.length) {
+ String result = cargos[length];
+ if (result == null) {
+ result = getCargoImpl(length);
+ cargos[length] = result;
+ }
+ return result;
+ }
+ return getCargoImpl(length);
+ }
+
+ private String getCargoImpl(int length) {
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = length; --i >= 0;) {
+ sb.append('a');
+ }
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Clients listen on different messages in the topic. The 'TYPE' property
+ * helps the client to select the proper messages.
+ */
+ private enum ClientType {
+ A("a", "b", "c"), B("c", "d", "e"), C("d", "e", "f"), D("g", "h");
+
+ public final String[] messageTypes;
+
+ public final String selector;
+
+ ClientType(String... messageTypes) {
+ this.messageTypes = messageTypes;
+
+ StringBuilder sb = new StringBuilder("TYPE in (");
+ for (int i = 0; i < messageTypes.length; i++) {
+ if (i > 0)
+ sb.append(", ");
+ sb.append('\'').append(messageTypes[i]).append('\'');
+ }
+ sb.append(')');
+ selector = sb.toString();
+ }
+
+ public static ClientType randomClientType() {
+ return values()[DurableSubDelayedUnsubscribeTest.random(values().length - 1)];
+ }
+
+ public final String randomMessageType() {
+ return messageTypes[DurableSubDelayedUnsubscribeTest.random(messageTypes.length - 1)];
+ }
+
+ public static String randomNonRelevantMessageType() {
+ return Integer.toString(DurableSubDelayedUnsubscribeTest.random(20));
+ }
+
+ @Override
+ public final String toString() {
+ return this.name() /* + '[' + selector + ']' */;
+ }
+ }
+
+ /**
+ * Creates new cliens.
+ */
+ private final class ClientManager extends Thread {
+
+ private int clientRover = 0;
+
+ private final CopyOnWriteArrayList<Client> clients = new CopyOnWriteArrayList<Client>();
+
+ private boolean stopped;
+
+ public ClientManager() {
+ super("ClientManager");
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ if (clients.size() < MAX_CLIENTS) {
+
+ if (stopped) {
+ // get out, don't start any more threads
+ break;
+ }
+ processLock.readLock().lock();
+ try {
+ createNewClient();
+ } finally {
+ processLock.readLock().unlock();
+ }
+ }
+
+ int size = clients.size();
+ sleepRandom(size * 3 * 1000, size * 6 * 1000);
+ }
+ } catch (Throwable e) {
+ exit("ClientManager.run failed.", e);
+ }
+ }
+
+ private void createNewClient() throws JMSException {
+ ClientType type = ClientType.randomClientType();
+
+ Client client;
+ synchronized (server.sendMutex) {
+ client = new Client(++clientRover, type);
+ clients.add(client);
+ }
+ client.start();
+
+ LOG.info(client.toString() + " created. " + this);
+ }
+
+ public void removeClient(Client client) {
+ clients.remove(client);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ClientManager[count=");
+ sb.append(clients.size());
+ sb.append(", clients=");
+ boolean sep = false;
+ for (Client client : clients) {
+ if (sep)
+ sb.append(", ");
+ else
+ sep = true;
+ sb.append(client.toString());
+ }
+ sb.append(']');
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Consumes massages from a durable subscription. Goes online/offline
+ * periodically. Checks the incoming messages against the sent messages of
+ * the server.
+ */
+ private final class Client extends Thread {
+
+ String url = "failover:(tcp://localhost:61656?wireFormat.maxInactivityDuration=0)?" + "jms.watchTopicAdvisories=false&"
+ + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&" + "jms.producerWindowSize=20971520&" + "jms.copyMessageOnSend=false&"
+ + "initialReconnectDelay=100&maxReconnectDelay=30000&" + "useExponentialBackOff=true";
+ final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
+
+ public static final String SUBSCRIPTION_NAME = "subscription";
+
+ private final int id;
+ private final String conClientId;
+
+ private final int lifetime = 60 * 1000;
+ private final int online = 1 * 1000;
+ private final int offline = 59 * 1000;
+
+ private final ClientType clientType;
+ private final String selector;
+
+ public Client(int id, ClientType clientType) throws JMSException {
+ super("Client" + id);
+ setDaemon(true);
+
+ this.id = id;
+ conClientId = "cli" + id;
+ this.clientType = clientType;
+ selector = "(COMMIT = true and RELEVANT = true) or " + clientType.selector;
+
+ subscribe();
+ }
+
+ @Override
+ public void run() {
+ // long end = System.currentTimeMillis() + lifetime.next();
+ long end = System.currentTimeMillis() + lifetime;
+ try {
+ boolean sleep = false;
+ while (true) {
+ long max = end - System.currentTimeMillis();
+ if (max <= 0)
+ break;
+
+ if (sleep)
+ Thread.sleep(offline);
+ // offline.sleepRandom();
+ else
+ sleep = true;
+
+ processLock.readLock().lock();
+ try {
+ process(online);
+ } finally {
+ processLock.readLock().unlock();
+ }
+ }
+
+ // 50% unsubscribe, 50% abondon subscription
+ if (!ALLOW_SUBSCRIPTION_ABANDONMENT) {
+ unsubscribe();
+ ALLOW_SUBSCRIPTION_ABANDONMENT = true;
+ } else {
+
+ LOG.info("Client abandon the subscription. " + this);
+
+ // housekeeper should sweep these abandoned subscriptions
+ houseKeeper.abandonedSubscriptions.add(conClientId);
+ ALLOW_SUBSCRIPTION_ABANDONMENT = false;
+ }
+ } catch (Throwable e) {
+ exit(toString() + " failed.", e);
+ }
+
+ clientManager.removeClient(this);
+ LOG.info(toString() + " DONE.");
+ }
+
+ private void process(long processingTime) throws JMSException {
+ long end = System.currentTimeMillis() + processingTime;
+ long hardEnd = end + 20000; // wait to finish the transaction.
+ boolean inTransaction = false;
+ int transCount = 0;
+
+ LOG.info(toString() + " ONLINE.");
+ Connection con = openConnection();
+ Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = sess.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, false);
+ try {
+ do {
+ long max = end - System.currentTimeMillis();
+ if (max <= 0) {
+ if (!inTransaction)
+ break;
+
+ max = hardEnd - System.currentTimeMillis();
+ if (max <= 0)
+ exit("" + this + " failed: Transaction is not finished.");
+ }
+
+ Message message = consumer.receive(max);
+ if (message == null)
+ continue;
+
+ if (message.propertyExists("COMMIT")) {
+ message.acknowledge(); // CLIENT_ACKNOWLEDGE
+
+ LOG.info("Received Trans[id=" + message.getIntProperty("TRANS") + ", count=" + transCount + "] in " + this + ".");
+
+ inTransaction = false;
+ transCount = 0;
+ } else {
+ inTransaction = true;
+ transCount++;
+ }
+ } while (true);
+ } finally {
+ sess.close();
+ con.close();
+ LOG.info(toString() + " OFFLINE.");
+ }
+ }
+
+ private Connection openConnection() throws JMSException {
+ Connection con = cf.createConnection();
+ con.setClientID(conClientId);
+ con.start();
+ return con;
+ }
+
+ private void subscribe() throws JMSException {
+ Connection con = openConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, true);
+ session.close();
+ con.close();
+ }
+
+ private void unsubscribe() throws JMSException {
+ Connection con = openConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.unsubscribe(SUBSCRIPTION_NAME);
+ session.close();
+ con.close();
+ }
+
+ @Override
+ public String toString() {
+ return "Client[id=" + id + ", type=" + clientType + "]";
+ }
+ }
+
+ /**
+ * Sweeps out not-used durable subscriptions.
+ */
+ private final class HouseKeeper extends Thread {
+
+ private HouseKeeper() {
+ super("HouseKeeper");
+ setDaemon(true);
+ }
+
+ public final CopyOnWriteArrayList<String> abandonedSubscriptions = new CopyOnWriteArrayList<String>();
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(3 * 60 * 1000);
+
+ processLock.readLock().lock();
+ try {
+ sweep();
+ } finally {
+ processLock.readLock().unlock();
+ }
+ } catch (InterruptedException ex) {
+ break;
+ } catch (Throwable e) {
+ Exception log = new Exception("HouseKeeper failed.", e);
+ log.printStackTrace();
+ }
+ }
+ }
+
+ private void sweep() throws Exception {
+ LOG.info("Housekeeper sweeping.");
+
+ int closed = 0;
+ ArrayList<String> sweeped = new ArrayList<String>();
+ try {
+ for (String clientId : abandonedSubscriptions) {
+ LOG.info("Sweeping out subscription of " + clientId + ".");
+ broker.getAdminView().destroyDurableSubscriber(clientId, Client.SUBSCRIPTION_NAME);
+ sweeped.add(clientId);
+ closed++;
+ }
+ } catch (Exception ignored) {
+ LOG.info("Ex on destroy sub " + ignored);
+ } finally {
+ abandonedSubscriptions.removeAll(sweeped);
+ }
+
+ LOG.info("Housekeeper sweeped out " + closed + " subscriptions.");
+ }
+ }
+
+ public static int random(int max) {
+ return (int) (Math.random() * (max + 1));
+ }
+
+ public static int random(int min, int max) {
+ return random(max - min) + min;
+ }
+
+ public static void sleepRandom(int maxMillis) throws InterruptedException {
+ Thread.sleep(random(maxMillis));
+ }
+
+ public static void sleepRandom(int minMillis, int maxMillis) throws InterruptedException {
+ Thread.sleep(random(minMillis, maxMillis));
+ }
+
+ public static final class Random {
+
+ final int min;
+ final int max;
+
+ Random(int min, int max) {
+ this.min = min;
+ this.max = max;
+ }
+
+ public int next() {
+ return random(min, max);
+ }
+
+ public void sleepRandom() throws InterruptedException {
+ DurableSubDelayedUnsubscribeTest.sleepRandom(min, max);
+ }
+ }
+
+ public static void exit(String message) {
+ exit(message, null);
+ }
+
+ public static void exit(String message, Throwable e) {
+ Throwable cause = new RuntimeException(message, e);
+ LOG.error(message, cause);
+ exceptions.add(cause);
+ fail(cause.toString());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ topic = new ActiveMQTopic("TopicT");
+ startBroker();
+
+ clientManager = new ClientManager();
+ server = new Server();
+ houseKeeper = new HouseKeeper();
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ destroyBroker();
+ }
+
+ private void startBroker() throws Exception {
+ startBroker(true);
+ }
+
+ private void startBroker(boolean deleteAllMessages) throws Exception {
+ if (broker != null)
+ return;
+
+ broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
+ broker.setBrokerName(getName());
+ broker.setAdvisorySupport(false);
+ broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+
+ File kahadbData = new File("activemq-data/" + getName() + "-kahadb");
+ if (deleteAllMessages)
+ delete(kahadbData);
+
+ broker.setPersistent(true);
+ KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+ kahadb.setDirectory(kahadbData);
+ kahadb.setJournalMaxFileLength(512 * 1024);
+ broker.setPersistenceAdapter(kahadb);
+
+ broker.addConnector("tcp://localhost:61656");
+
+ broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
+ broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024);
+ broker.getSystemUsage().getStoreUsage().setLimit(256 * 1024 * 1024);
+
+ broker.start();
+ }
+
+ protected static String getName() {
+ return "DurableSubProcessWithRestartTest";
+ }
+
+ private static boolean delete(File path) {
+ if (path == null)
+ return true;
+
+ if (path.isDirectory()) {
+ for (File file : path.listFiles()) {
+ delete(file);
+ }
+ }
+ return path.delete();
+ }
+
+ private void destroyBroker() throws Exception {
+ if (broker == null)
+ return;
+
+ broker.stop();
+ broker = null;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubDelayedUnsubscribeTest.java
------------------------------------------------------------------------------
svn:eol-style = native