You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/05/20 14:02:11 UTC

svn commit: r946600 [2/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Thu May 20 12:02:10 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.store.amq;
 
 import java.io.File;
-
 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -35,7 +34,6 @@ import org.apache.activemq.util.IOHelper
  */
 public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
     static final int DEFAULT_MAX_REFERNCE_FILE_LENGTH=2*1024*1024;
-    private TaskRunnerFactory taskRunnerFactory;
     private File dataDirectory;
     private int journalThreadPriority = Thread.MAX_PRIORITY;
     private String brokerName = "localhost";
@@ -56,6 +54,7 @@ public class AMQPersistenceAdapterFactor
     private boolean forceRecoverReferenceStore=false;
     private long checkpointInterval = 1000 * 20;
     private boolean useDedicatedTaskRunner;
+    private TaskRunnerFactory taskRunnerFactory;
 
 
     /**
@@ -82,6 +81,8 @@ public class AMQPersistenceAdapterFactor
         result.setMaxReferenceFileLength(getMaxReferenceFileLength());
         result.setForceRecoverReferenceStore(isForceRecoverReferenceStore());
         result.setRecoverReferenceStore(isRecoverReferenceStore());
+        result.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
+        result.setJournalThreadPriority(getJournalThreadPriority());
         return result;
     }
 
@@ -122,10 +123,6 @@ public class AMQPersistenceAdapterFactor
      * @return the taskRunnerFactory
      */
     public TaskRunnerFactory getTaskRunnerFactory() {
-        if (taskRunnerFactory == null) {
-            taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority,
-                                                      true, 1000, isUseDedicatedTaskRunner());
-        }
         return taskRunnerFactory;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu May 20 12:02:10 2010
@@ -31,7 +31,6 @@ import java.util.concurrent.ThreadFactor
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.activeio.journal.InvalidRecordLocationException;
 import org.apache.activeio.journal.Journal;
 import org.apache.activeio.journal.JournalEventListener;
@@ -64,9 +63,9 @@ import org.apache.activemq.thread.Schedu
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -85,7 +84,7 @@ public class JournalPersistenceAdapter i
 
     private BrokerService brokerService;
 	
-    protected static final Scheduler scheduler = Scheduler.getInstance();
+    protected Scheduler scheduler;
     private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class);
 
     private Journal journal;
@@ -97,20 +96,20 @@ public class JournalPersistenceAdapter i
     private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
 
     private SystemUsage usageManager;
-    private long checkpointInterval = 1000 * 60 * 5;
+    private final long checkpointInterval = 1000 * 60 * 5;
     private long lastCheckpointRequest = System.currentTimeMillis();
     private long lastCleanup = System.currentTimeMillis();
     private int maxCheckpointWorkers = 10;
     private int maxCheckpointMessageAddSize = 1024 * 1024;
 
-    private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
+    private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
     private ThreadPoolExecutor checkpointExecutor;
 
     private TaskRunner checkpointTask;
     private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
     private boolean fullCheckPoint;
 
-    private AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean started = new AtomicBoolean(false);
 
     private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
 
@@ -267,7 +266,9 @@ public class JournalPersistenceAdapter i
         recover();
 
         // Do a checkpoint periodically.
-        scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
+        this.scheduler = new Scheduler("Journal Scheduler");
+        this.scheduler.start();
+        this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
 
     }
 
@@ -278,7 +279,8 @@ public class JournalPersistenceAdapter i
             return;
         }
 
-        scheduler.cancel(periodicCheckpointTask);
+        this.scheduler.cancel(periodicCheckpointTask);
+        this.scheduler.stop();
 
         // Take one final checkpoint and stop checkpoint processing.
         checkpoint(true, true);
@@ -723,6 +725,7 @@ public class JournalPersistenceAdapter i
         longTermPersistence.setBrokerName(brokerName);
     }
 
+    @Override
     public String toString() {
         return "JournalPersistenceAdapator(" + longTermPersistence + ")";
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Thu May 20 12:02:10 2010
@@ -415,6 +415,7 @@ public class KahaDBPersistenceAdapter im
     
     @Override
     public String toString() {
-        return "KahaDBPersistenceAdapter";
+        String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
+        return "KahaDBPersistenceAdapter[" + path +"]" ;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu May 20 12:02:10 2010
@@ -72,6 +72,7 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ServiceStopper;
@@ -94,6 +95,7 @@ public class KahaDBStore extends Message
     private boolean concurrentStoreAndDispatchQueues = true;
     private boolean concurrentStoreAndDispatchTopics = true;
     private int maxAsyncJobs = MAX_ASYNC_JOBS;
+    private Scheduler scheduler;
 
     public KahaDBStore() {
 
@@ -155,6 +157,7 @@ public class KahaDBStore extends Message
 
     @Override
     public void doStart() throws Exception {
+        super.doStart();
         this.queueSemaphore = new Semaphore(getMaxAsyncJobs());
         this.topicSemaphore = new Semaphore(getMaxAsyncJobs());
         this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
@@ -175,8 +178,6 @@ public class KahaDBStore extends Message
                         return thread;
                     }
                 });
-        super.doStart();
-
     }
 
     @Override
@@ -204,6 +205,7 @@ public class KahaDBStore extends Message
     protected void addQueueTask(StoreQueueTask task) throws IOException {
         try {
             this.queueSemaphore.acquire();
+
         } catch (InterruptedException e) {
             throw new InterruptedIOException(e.getMessage());
         }
@@ -327,7 +329,6 @@ public class KahaDBStore extends Message
 
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-
             store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
 
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu May 20 12:02:10 2010
@@ -82,7 +82,7 @@ import org.apache.kahadb.util.VariableMa
 
 public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
 	
-	private BrokerService brokerService;
+	protected BrokerService brokerService;
 
     public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
     public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500"));
@@ -245,7 +245,6 @@ public class MessageDatabase extends Ser
                     // to see if we need to exit this thread.
                     long sleepTime = Math.min(checkpointInterval, 500);
                     while (opened.get()) {
-                        
                         Thread.sleep(sleepTime);
                         long now = System.currentTimeMillis();
                         if( now - lastCleanup >= cleanupInterval ) {
@@ -276,9 +275,7 @@ public class MessageDatabase extends Ser
 	public void open() throws IOException {
 		if( opened.compareAndSet(false, true) ) {
             getJournal().start();
-            
-	        loadPageFile();
-	        
+	        loadPageFile();        
 	        startCheckpoint();
             recover();
 		}
@@ -332,6 +329,11 @@ public class MessageDatabase extends Ser
 	public void close() throws IOException, InterruptedException {
 		if( opened.compareAndSet(true, false)) {
 	        synchronized (indexMutex) {
+	            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+	                public void execute(Transaction tx) throws IOException {
+	                    checkpointUpdate(tx, true);
+	                }
+	            });
 	            pageFile.unload();
 	            metadata = new Metadata();
 	        }
@@ -385,11 +387,12 @@ public class MessageDatabase extends Ser
      */
     private void recover() throws IllegalStateException, IOException {
         synchronized (indexMutex) {
-	        long start = System.currentTimeMillis();
-	        
+            
+	        long start = System.currentTimeMillis();        
 	        Location recoveryPosition = getRecoveryPosition();
 	        if( recoveryPosition!=null ) {
 		        int redoCounter = 0;
+		        LOG.info("Recoverying from the journal ...");
 		        while (recoveryPosition != null) {
 		            JournalCommand message = load(recoveryPosition);
 		            metadata.lastUpdate = recoveryPosition;
@@ -398,7 +401,7 @@ public class MessageDatabase extends Ser
 		            recoveryPosition = journal.getNextLocation(recoveryPosition);
 		        }
 		        long end = System.currentTimeMillis();
-	        	LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+	        	LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
 	        }
 	     
 	        // We may have to undo some index updates.
@@ -693,7 +696,7 @@ public class MessageDatabase extends Ser
     // from the recovery method too so they need to be idempotent
     // /////////////////////////////////////////////////////////////////
 
-    private void process(JournalCommand data, final Location location) throws IOException {
+    void process(JournalCommand data, final Location location) throws IOException {
         data.visit(new Visitor() {
             @Override
             public void visit(KahaAddMessageCommand command) throws IOException {
@@ -732,11 +735,12 @@ public class MessageDatabase extends Ser
         });
     }
 
-    private void process(final KahaAddMessageCommand command, final Location location) throws IOException {
+    protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
             synchronized (indexMutex) {
                 ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
                 inflightTx.add(new AddOpperation(command, location));
+                TransactionId key = key(command.getTransactionInfo());
             }
         } else {
             synchronized (indexMutex) {
@@ -836,7 +840,7 @@ public class MessageDatabase extends Ser
     protected final Object indexMutex = new Object();
 	private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
 
-    private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
+    void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
 
         // Skip adding the message to the index if this is a topic and there are
@@ -870,7 +874,7 @@ public class MessageDatabase extends Ser
         
     }
 
-    private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
+    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         if (!command.hasSubscriptionKey()) {
             
@@ -902,7 +906,7 @@ public class MessageDatabase extends Ser
         }
     }
 
-    private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
+    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         sd.orderIndex.clear(tx);
         sd.orderIndex.unload(tx);
@@ -931,7 +935,7 @@ public class MessageDatabase extends Ser
         metadata.destinations.remove(tx, key);
     }
 
-    private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
+    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
 
         // If set then we are creating it.. otherwise we are destroying the sub
@@ -961,8 +965,7 @@ public class MessageDatabase extends Ser
      * @param tx
      * @throws IOException
      */
-    private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
-
+    void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
         LOG.debug("Checkpoint started.");
         
         metadata.state = OPEN_STATE;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Thu May 20 12:02:10 2010
@@ -19,32 +19,25 @@ package org.apache.activemq.thread;
 import java.util.HashMap;
 import java.util.Timer;
 import java.util.TimerTask;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
 
 /**
- * Singelton, references maintained by users
  * @version $Revision$
  */
-public final class Scheduler { 
-
-	private final Timer CLOCK_DAEMON = new Timer("ActiveMQ Scheduler", true);
-    private final HashMap<Runnable, TimerTask> TIMER_TASKS = new HashMap<Runnable, TimerTask>();
-    private static Scheduler instance;
-    
-    static {
-        instance = new Scheduler();
-    }
+public final class Scheduler extends ServiceSupport { 
+    private final String name;
+	private Timer timer;
+    private final HashMap<Runnable, TimerTask> timerTasks = new HashMap<Runnable, TimerTask>();
     
-    private Scheduler() {
-    }
-
-    public static Scheduler getInstance() {
-        return instance;
+    public Scheduler (String name) {
+        this.name = name;
     }
-    
-    public synchronized void executePeriodically(final Runnable task, long period) {
+        
+    public void executePeriodically(final Runnable task, long period) {
     	TimerTask timerTask = new SchedulerTimerTask(task);
-        CLOCK_DAEMON.scheduleAtFixedRate(timerTask, period, period);
-        TIMER_TASKS.put(task, timerTask);
+        timer.scheduleAtFixedRate(timerTask, period, period);
+        timerTasks.put(task, timerTask);
     }
 
     /*
@@ -53,24 +46,38 @@ public final class Scheduler { 
      */
     public synchronized void schedualPeriodically(final Runnable task, long period) {
         TimerTask timerTask = new SchedulerTimerTask(task);
-        CLOCK_DAEMON.schedule(timerTask, period, period);
-        TIMER_TASKS.put(task, timerTask);
+        timer.schedule(timerTask, period, period);
+        timerTasks.put(task, timerTask);
     }
     
     public synchronized void cancel(Runnable task) {
-    	TimerTask ticket = TIMER_TASKS.remove(task);
+    	TimerTask ticket = timerTasks.remove(task);
         if (ticket != null) {
             ticket.cancel();
-            CLOCK_DAEMON.purge();//remove cancelled TimerTasks
+            timer.purge();//remove cancelled TimerTasks
         }
     }
 
-    public void executeAfterDelay(final Runnable task, long redeliveryDelay) {
+    public synchronized void executeAfterDelay(final Runnable task, long redeliveryDelay) {
     	TimerTask timerTask = new SchedulerTimerTask(task);
-        CLOCK_DAEMON.schedule(timerTask, redeliveryDelay);
+        timer.schedule(timerTask, redeliveryDelay);
     }
     
     public void shutdown() {
-        CLOCK_DAEMON.cancel();
+        timer.cancel();
+    }
+
+    @Override
+    protected synchronized void doStart() throws Exception {
+        this.timer = new Timer(name, true);
+        
+    }
+
+    @Override
+    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
+       if (this.timer != null) {
+           this.timer.cancel();
+       }
+        
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Thu May 20 12:02:10 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.usage;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.activemq.Service;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.kahadb.plist.PListStore;
@@ -36,6 +37,7 @@ public class SystemUsage implements Serv
     private MemoryUsage memoryUsage;
     private StoreUsage storeUsage;
     private TempUsage tempUsage;
+    private ThreadPoolExecutor executor;
 
     /**
      * True if someone called setSendFailIfNoSpace() on this particular usage
@@ -45,7 +47,7 @@ public class SystemUsage implements Serv
     private boolean sendFailIfNoSpace;
     private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet;
     private long sendFailIfNoSpaceAfterTimeout = 0;
-    
+
     private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
 
     public SystemUsage() {
@@ -58,14 +60,21 @@ public class SystemUsage implements Serv
         this.memoryUsage = new MemoryUsage(name + ":memory");
         this.storeUsage = new StoreUsage(name + ":store", adapter);
         this.tempUsage = new TempUsage(name + ":temp", tempStore);
+        this.memoryUsage.setExecutor(getExecutor());
+        this.storeUsage.setExecutor(getExecutor());
+        this.tempUsage.setExecutor(getExecutor());
     }
 
     public SystemUsage(SystemUsage parent, String name) {
         this.parent = parent;
+        this.executor = parent.getExecutor();
         this.name = name;
         this.memoryUsage = new MemoryUsage(parent.memoryUsage, name + ":memory");
         this.storeUsage = new StoreUsage(parent.storeUsage, name + ":store");
         this.tempUsage = new TempUsage(parent.tempUsage, name + ":temp");
+        this.memoryUsage.setExecutor(getExecutor());
+        this.storeUsage.setExecutor(getExecutor());
+        this.tempUsage.setExecutor(getExecutor());
     }
 
     public String getName() {
@@ -186,6 +195,7 @@ public class SystemUsage implements Serv
             memoryUsage.setParent(parent.memoryUsage);
         }
         this.memoryUsage = memoryUsage;
+        this.memoryUsage.setExecutor(getExecutor());
     }
 
     public void setStoreUsage(StoreUsage storeUsage) {
@@ -199,6 +209,7 @@ public class SystemUsage implements Serv
             storeUsage.setParent(parent.storeUsage);
         }
         this.storeUsage = storeUsage;
+        this.storeUsage.setExecutor(executor);
 
     }
 
@@ -213,5 +224,30 @@ public class SystemUsage implements Serv
             tempDiskUsage.setParent(parent.tempUsage);
         }
         this.tempUsage = tempDiskUsage;
+        this.tempUsage.setExecutor(getExecutor());
+    }
+
+    /**
+     * @return the executor
+     */
+    public ThreadPoolExecutor getExecutor() {
+        return this.executor;
+    }
+
+    /**
+     * @param executor
+     *            the executor to set
+     */
+    public void setExecutor(ThreadPoolExecutor executor) {
+        this.executor = executor;
+        if (this.memoryUsage != null) {
+            this.memoryUsage.setExecutor(this.executor);
+        }
+        if (this.storeUsage != null) {
+            this.storeUsage.setExecutor(this.executor);
+        }
+        if (this.tempUsage != null) {
+            this.tempUsage.setExecutor(this.executor);
+        }
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Thu May 20 12:02:10 2010
@@ -21,13 +21,8 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.activemq.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,7 +38,6 @@ import org.apache.commons.logging.LogFac
 public abstract class Usage<T extends Usage> implements Service {
 
     private static final Log LOG = LogFactory.getLog(Usage.class);
-    private static ThreadPoolExecutor executor;
     protected final Object usageMutex = new Object();
     protected int percentUsage;
     protected T parent;
@@ -53,12 +47,11 @@ public abstract class Usage<T extends Us
     private final boolean debug = LOG.isDebugEnabled();
     private String name;
     private float usagePortion = 1.0f;
-    private List<T> children = new CopyOnWriteArrayList<T>();
+    private final List<T> children = new CopyOnWriteArrayList<T>();
     private final List<Runnable> callbacks = new LinkedList<Runnable>();
     private int pollingTime = 100;
-    
-    private AtomicBoolean started=new AtomicBoolean();
-
+    private final AtomicBoolean started=new AtomicBoolean();
+    private ThreadPoolExecutor executor;
     public Usage(T parent, String name, float portion) {
         this.parent = parent;
         this.usagePortion = portion;
@@ -289,6 +282,7 @@ public abstract class Usage<T extends Us
         return name;
     }
 
+    @Override
     public String toString() {
         return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
     }
@@ -411,18 +405,10 @@ public abstract class Usage<T extends Us
         this.parent = parent;
     }
     
-    protected Executor getExecutor() {
-        return executor;
+    public void setExecutor (ThreadPoolExecutor executor) {
+        this.executor = executor;
     }
-    
-    static {
-        executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "Usage Async Task");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
+    public ThreadPoolExecutor getExecutor() {
+        return executor;
     }
-
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java Thu May 20 12:02:10 2010
@@ -16,14 +16,23 @@
  */
 package org.apache.activemq.broker;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
 
 public class BrokerRestartTestSupport extends BrokerTestSupport {
 
     private PersistenceAdapter persistenceAdapter;
 
+    @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = new BrokerService();
+        File dir = broker.getBrokerDataDirectory();
+        if (dir != null) {
+            IOHelper.deleteChildren(dir);
+        }
         //broker.setPersistent(false);
         broker.setDeleteAllMessagesOnStartup(true);
         persistenceAdapter = broker.getPersistenceAdapter();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Thu May 20 12:02:10 2010
@@ -17,13 +17,14 @@
 package org.apache.activemq.bugs;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -41,9 +42,7 @@ import javax.jms.TopicConnectionFactory;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
-
 import junit.framework.Test;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
@@ -121,7 +120,7 @@ public class DurableConsumerTest extends
     }
     
     private class MessagePublisher implements Runnable{
-        private boolean shouldPublish = true;
+        private final boolean shouldPublish = true;
         
         public void run(){
             TopicConnectionFactory topicConnectionFactory = null;
@@ -170,13 +169,14 @@ public class DurableConsumerTest extends
         
         Thread publisherThread = new Thread(new MessagePublisher());
         publisherThread.start();
-        
+        final List<SimpleTopicSubscriber> list = new ArrayList<SimpleTopicSubscriber>();
         for (int i = 0; i < 100; i++) {
             
             final int id = i;
             Thread thread = new Thread(new Runnable(){
                 public void run(){
-                    new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME);
+                    SimpleTopicSubscriber s =new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME);
+                    list.add(s);
                 }
             });
             thread.start();
@@ -189,6 +189,9 @@ public class DurableConsumerTest extends
         configurePersistence(broker);
         broker.start();
         Thread.sleep(10000);
+        for (SimpleTopicSubscriber s:list) {
+            s.closeConnection();
+        }
         assertEquals(0, exceptions.size());
     }
     
@@ -358,6 +361,7 @@ public class DurableConsumerTest extends
         
     }
     
+    @Override
     protected void setUp() throws Exception{
         if (broker == null) {
             broker = createBroker(true);
@@ -366,6 +370,7 @@ public class DurableConsumerTest extends
         super.setUp();
     }
     
+    @Override
     protected void tearDown() throws Exception{
         super.tearDown();
         if (broker != null) {
@@ -392,11 +397,13 @@ public class DurableConsumerTest extends
     protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{
         answer.setDeleteAllMessagesOnStartup(deleteStore);
         KahaDBStore kaha = new KahaDBStore();
+        //kaha.setConcurrentStoreAndDispatchTopics(false);
         File directory = new File("target/activemq-data/kahadb");
         if (deleteStore) {
             IOHelper.deleteChildren(directory);
         }
         kaha.setDirectory(directory);
+        //kaha.setMaxAsyncJobs(10);
         
         answer.setPersistenceAdapter(kaha);
         answer.addConnector(bindAddress);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java Thu May 20 12:02:10 2010
@@ -26,7 +26,6 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.thread.Scheduler;
 import org.apache.log4j.Logger;
 
 public class EmbeddedActiveMQ
@@ -39,6 +38,7 @@ public class EmbeddedActiveMQ
  
                 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                 BrokerService brokerService = null;
+                Connection connection = null;
  
                 logger.info("Start...");
                 try
@@ -49,7 +49,7 @@ public class EmbeddedActiveMQ
                         logger.info("Broker '" + brokerService.getBrokerName() + "' is starting........");
                         brokerService.start();
                         ConnectionFactory fac = new ActiveMQConnectionFactory("vm://TestMQ");
-                        Connection connection = fac.createConnection();
+                        connection = fac.createConnection();
                         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                         Destination queue = session.createQueue("TEST.QUEUE");
                         MessageProducer producer = session.createProducer(queue);
@@ -71,12 +71,9 @@ public class EmbeddedActiveMQ
                         try
                         {
                                 br.close();
-                                Scheduler scheduler = Scheduler.getInstance();
-                                scheduler.shutdown();
                                 logger.info("Broker '" + brokerService.getBrokerName() + "' is stopping........");
-                                brokerService.stop();
-                                Scheduler.getInstance().shutdown();
- 
+                                connection.close();
+                                brokerService.stop(); 
                                 sleep(8);
                                 logger.info(ThreadExplorer.show("Active threads after stop:"));
  
@@ -90,7 +87,7 @@ public class EmbeddedActiveMQ
                 logger.info("Waiting for list theads is greater then 1 ...");
                 int numTh = ThreadExplorer.active();
  
-                while (numTh > 1)
+                while (numTh > 2)
                 {
                         sleep(3);
                         numTh = ThreadExplorer.active();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java Thu May 20 12:02:10 2010
@@ -18,15 +18,16 @@ package org.apache.activemq.network;
 
 import javax.jms.MessageProducer;
 import javax.jms.TemporaryQueue;
-
 import org.apache.activemq.broker.BrokerService;
 
 public class DuplexNetworkTest extends SimpleNetworkTest {
 
+    @Override
     protected String getLocalBrokerURI() {
         return "org/apache/activemq/network/duplexLocalBroker.xml";
     }
 
+    @Override
     protected BrokerService createRemoteBroker() throws Exception {
         BrokerService broker = new BrokerService();
         broker.setBrokerName("remoteBroker");

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Thu May 20 12:02:10 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.network;
 
 import java.net.URI;
-
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -30,9 +29,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.TopicRequestor;
 import javax.jms.TopicSession;
-
 import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -174,11 +171,13 @@ public class SimpleNetworkTest extends T
         }
     }    
     
+    @Override
     protected void setUp() throws Exception {
         super.setUp();
         doSetUp();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         localBroker.deleteAllMessages();
         remoteBroker.deleteAllMessages();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java Thu May 20 12:02:10 2010
@@ -20,10 +20,11 @@ package org.apache.activemq.usage;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,6 +32,7 @@ import org.junit.Test;
 public class MemoryUsageTest {
 
     MemoryUsage underTest;
+    ThreadPoolExecutor executor;
       
     @Test
     public final void testPercentUsageNeedsNoThread() {    
@@ -46,6 +48,7 @@ public class MemoryUsageTest {
     public final void testAddUsageListenerStartsThread() throws Exception {       
         int activeThreadCount = Thread.activeCount();
         underTest = new MemoryUsage();
+        underTest.setExecutor(executor);
         underTest.setLimit(10);
         underTest.start();
         final CountDownLatch called = new CountDownLatch(1);
@@ -66,12 +69,24 @@ public class MemoryUsageTest {
     
     @Before
     public void setUp() throws Exception {
-        underTest = new MemoryUsage();   
+        underTest = new MemoryUsage();
+        this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "Usage Async Task");
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+        underTest.setExecutor(this.executor);
+        
     }
     
     @After
     public void tearDown() {
         assertNotNull(underTest);
         underTest.stop();
+        if (this.executor != null) {
+            this.executor.shutdownNow();
+        }
     }
 }