You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/05/20 14:02:11 UTC

svn commit: r946600 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/...

Author: rajdavies
Date: Thu May 20 12:02:10 2010
New Revision: 946600

URL: http://svn.apache.org/viewvc?rev=946600&view=rev
Log:
fixes for https://issues.apache.org/activemq/browse/AMQ-2620 and
https://issues.apache.org/activemq/browse/AMQ-2568

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/embedded/EmbeddedActiveMQ.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu May 20 12:02:10 2010
@@ -87,6 +87,7 @@ import org.apache.activemq.management.JM
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
 import org.apache.activemq.state.CommandVisitorAdapter;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
@@ -114,7 +115,7 @@ public class ActiveMQConnection implemen
     protected boolean alwaysSessionAsync = true;
 
     private TaskRunnerFactory sessionTaskRunner;
-    private final ThreadPoolExecutor asyncConnectionThread;
+    private final ThreadPoolExecutor executor;
 
     // Connection state variables
     private final ConnectionInfo info;
@@ -188,6 +189,7 @@ public class ActiveMQConnection implemen
     private boolean useDedicatedTaskRunner;
     protected volatile CountDownLatch transportInterruptionProcessingComplete;
     private long consumerFailoverRedeliveryWaitPeriod;
+    private final Scheduler scheduler;
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -204,16 +206,16 @@ public class ActiveMQConnection implemen
 
         // Configure a single threaded executor who's core thread can timeout if
         // idle
-        asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r, "ActiveMQ Connection Worker: " + transport);
+                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
                 thread.setDaemon(true);
                 return thread;
             }
         });
         // asyncConnectionThread.allowCoreThreadTimeOut(true);
-
-        this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId()));
+        String uniqueId = CONNECTION_ID_GENERATOR.generateId();
+        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
         this.info.setManageable(true);
         this.info.setFaultTolerant(transport.isFaultTolerant());
         this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
@@ -224,6 +226,8 @@ public class ActiveMQConnection implemen
         this.factoryStats.addConnection(this);
         this.timeCreated = System.currentTimeMillis();
         this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
+        this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
+        this.scheduler.start();
     }
 
     protected void setUserName(String userName) {
@@ -609,6 +613,14 @@ public class ActiveMQConnection implemen
                         advisoryConsumer.dispose();
                         advisoryConsumer = null;
                     }
+                    if (this.scheduler != null) {
+                        try {
+                            this.scheduler.stop();
+                        } catch (Exception e) {
+                            JMSException ex =  JMSExceptionSupport.create(e);
+                            throw ex;
+                        }
+                    }
 
                     long lastDeliveredSequenceId = 0;
                     for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
@@ -656,8 +668,8 @@ public class ActiveMQConnection implemen
             }
         } finally {
             try {
-                if (asyncConnectionThread != null){
-                    asyncConnectionThread.shutdown();
+                if (executor != null){
+                    executor.shutdown();
                 }
             }catch(Throwable e) {
                 LOG.error("Error shutting down thread pool " + e,e);
@@ -1719,7 +1731,7 @@ public class ActiveMQConnection implemen
 
                     @Override
                     public Response processConnectionError(final ConnectionError error) throws Exception {
-                        asyncConnectionThread.execute(new Runnable() {
+                        executor.execute(new Runnable() {
                             public void run() {
                                 onAsyncException(error.getException());
                             }
@@ -1779,7 +1791,7 @@ public class ActiveMQConnection implemen
     public void onClientInternalException(final Throwable error) {
         if ( !closed.get() && !closing.get() ) {
             if ( this.clientInternalExceptionListener != null ) {
-                asyncConnectionThread.execute(new Runnable() {
+                executor.execute(new Runnable() {
                     public void run() {
                         ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
                     }
@@ -1804,7 +1816,7 @@ public class ActiveMQConnection implemen
                 }
                 final JMSException e = (JMSException)error;
 
-                asyncConnectionThread.execute(new Runnable() {
+                executor.execute(new Runnable() {
                     public void run() {
                         ActiveMQConnection.this.exceptionListener.onException(e);
                     }
@@ -1819,7 +1831,7 @@ public class ActiveMQConnection implemen
     public void onException(final IOException error) {
 		onAsyncException(error);
 		if (!closing.get() && !closed.get()) {
-			asyncConnectionThread.execute(new Runnable() {
+			executor.execute(new Runnable() {
 				public void run() {
 					transportFailed(error);
 					ServiceSupport.dispose(ActiveMQConnection.this.transport);
@@ -2297,4 +2309,12 @@ public class ActiveMQConnection implemen
     public long getConsumerFailoverRedeliveryWaitPeriod() {
         return consumerFailoverRedeliveryWaitPeriod;
     }
+    
+    protected Scheduler getScheduler() {
+        return this.scheduler;
+    }
+    
+    protected ThreadPoolExecutor getExecutor() {
+        return this.executor;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu May 20 12:02:10 2010
@@ -29,7 +29,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
@@ -37,7 +36,6 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.TransactionRolledBackException;
-
 import org.apache.activemq.blob.BlobDownloader;
 import org.apache.activemq.command.ActiveMQBlobMessage;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -111,7 +109,7 @@ public class ActiveMQMessageConsumer imp
     }
 
     private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class);
-    protected static final Scheduler scheduler = Scheduler.getInstance();
+    protected final Scheduler scheduler;
     protected final ActiveMQSession session;
     protected final ConsumerInfo info;
 
@@ -130,17 +128,17 @@ public class ActiveMQMessageConsumer imp
     private int ackCounter;
     private int dispatchedCount;
     private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
-    private JMSConsumerStatsImpl stats;
+    private final JMSConsumerStatsImpl stats;
 
     private final String selector;
     private boolean synchronizationRegistered;
-    private AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean started = new AtomicBoolean(false);
 
     private MessageAvailableListener availableListener;
 
     private RedeliveryPolicy redeliveryPolicy;
     private boolean optimizeAcknowledge;
-    private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
+    private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
     private ExecutorService executorService;
     private MessageTransformer transformer;
     private boolean clearDispatchList;
@@ -152,7 +150,7 @@ public class ActiveMQMessageConsumer imp
     private IOException failureError;
     
     private long optimizeAckTimestamp = System.currentTimeMillis();
-    private long optimizeAckTimeout = 300;
+    private final long optimizeAckTimeout = 300;
     private long failoverRedeliveryWaitPeriod = 0;
 
     /**
@@ -202,6 +200,7 @@ public class ActiveMQMessageConsumer imp
         }
 
         this.session = session;
+        this.scheduler = session.getScheduler();
         this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
         setTransformer(session.getTransformer());
 
@@ -634,10 +633,12 @@ public class ActiveMQMessageConsumer imp
         if (!unconsumedMessages.isClosed()) {
             if (session.getTransactionContext().isInTransaction()) {
                 session.getTransactionContext().addSynchronization(new Synchronization() {
+                    @Override
                     public void afterCommit() throws Exception {
                         doClose();
                     }
 
+                    @Override
                     public void afterRollback() throws Exception {
                         doClose();
                     }
@@ -912,16 +913,19 @@ public class ActiveMQMessageConsumer imp
             if (!synchronizationRegistered) {
                 synchronizationRegistered = true;
                 session.getTransactionContext().addSynchronization(new Synchronization() {
+                    @Override
                     public void beforeEnd() throws Exception {
                         acknowledge();
                         synchronizationRegistered = false;
                     }
 
+                    @Override
                     public void afterCommit() throws Exception {
                         commit();
                         synchronizationRegistered = false;
                     }
 
+                    @Override
                     public void afterRollback() throws Exception {
                         rollback();
                         synchronizationRegistered = false;
@@ -1325,6 +1329,7 @@ public class ActiveMQMessageConsumer imp
         unconsumedMessages.stop();
     }
 
+    @Override
     public String toString() {
         return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
                + " }";

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Thu May 20 12:02:10 2010
@@ -19,13 +19,11 @@ package org.apache.activemq;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Message;
-
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
@@ -73,9 +71,9 @@ public class ActiveMQMessageProducer ext
     protected ProducerInfo info;
     protected boolean closed;
 
-    private JMSProducerStatsImpl stats;
+    private final JMSProducerStatsImpl stats;
     private AtomicLong messageSequence;
-    private long startTime;
+    private final long startTime;
     private MessageTransformer transformer;
     private MemoryUsage producerWindow;
 
@@ -93,6 +91,7 @@ public class ActiveMQMessageProducer ext
         // size > 0
         if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
             producerWindow = new MemoryUsage("Producer Window: " + producerId);
+            producerWindow.setExecutor(session.getConnectionExecutor());
             producerWindow.setLimit(this.info.getWindowSize());
             producerWindow.start();
         }
@@ -164,6 +163,7 @@ public class ActiveMQMessageProducer ext
      * 
      * @throws IllegalStateException
      */
+    @Override
     protected void checkClosed() throws IllegalStateException {
         if (closed) {
             throw new IllegalStateException("The producer is closed");
@@ -280,6 +280,7 @@ public class ActiveMQMessageProducer ext
         this.info = info;
     }
 
+    @Override
     public String toString() {
         return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu May 20 12:02:10 2010
@@ -24,8 +24,8 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
@@ -53,7 +53,6 @@ import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 import javax.jms.TransactionRolledBackException;
-
 import org.apache.activemq.blob.BlobDownloader;
 import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.activemq.blob.BlobUploader;
@@ -198,7 +197,8 @@ public class ActiveMQSession implements 
     }
 
     private static final Log LOG = LogFactory.getLog(ActiveMQSession.class);
-    protected static final Scheduler scheduler = Scheduler.getInstance();
+    private final Scheduler scheduler;
+    private final ThreadPoolExecutor connectionExecutor;
 
     protected int acknowledgementMode;
     protected final ActiveMQConnection connection;
@@ -220,7 +220,7 @@ public class ActiveMQSession implements 
     protected Object sendMutex = new Object();
 
     private MessageListener messageListener;
-    private JMSSessionStatsImpl stats;
+    private final JMSSessionStatsImpl stats;
     private TransactionContext transactionContext;
     private DeliveryListener deliveryListener;
     private MessageTransformer transformer;
@@ -251,7 +251,8 @@ public class ActiveMQSession implements 
         this.connection.asyncSendPacket(info);
         setTransformer(connection.getTransformer());
         setBlobTransferPolicy(connection.getBlobTransferPolicy());
-
+        this.scheduler=connection.getScheduler();
+        this.connectionExecutor=connection.getExecutor();
         if (connection.isStarted()) {
             start();
         }
@@ -613,11 +614,13 @@ public class ActiveMQSession implements 
                     synchronizationRegistered = true;
                     getTransactionContext().addSynchronization(new Synchronization() {
 
+                                        @Override
                                         public void afterCommit() throws Exception {
                                             doClose();
                                             synchronizationRegistered = false;
                                         }
 
+                                        @Override
                                         public void afterRollback() throws Exception {
                                             doClose();
                                             synchronizationRegistered = false;
@@ -846,6 +849,7 @@ public class ActiveMQSession implements 
                 if (ack.getTransactionId() != null) {
                     getTransactionContext().addSynchronization(new Synchronization() {
 
+                        @Override
                         public void afterRollback() throws Exception {
                             md.getMessage().onMessageRolledBack();
                             // ensure we don't filter this as a duplicate
@@ -1947,6 +1951,7 @@ public class ActiveMQSession implements 
         return executor.getUnconsumedMessages();
     }
 
+    @Override
     public String toString() {
         return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
     }
@@ -2025,4 +2030,12 @@ public class ActiveMQSession implements 
             syncSendPacket(ack);
         }
     }
+    
+    protected Scheduler getScheduler() {
+        return this.scheduler;
+    }
+    
+    protected ThreadPoolExecutor getConnectionExecutor() {
+        return this.connectionExecutor;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu May 20 12:02:10 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.broker;
 
 import java.net.URI;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -33,6 +34,7 @@ import org.apache.activemq.command.Produ
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -372,6 +374,10 @@ public interface Broker extends Region, 
      *  configuration
      */
     void nowMasterBroker();
+    
+    Scheduler getScheduler();
+    
+    ThreadPoolExecutor getExecutor();
 
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Thu May 20 12:02:10 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.broker;
 import java.net.URI;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -40,6 +41,7 @@ import org.apache.activemq.command.Respo
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -300,4 +302,12 @@ public class BrokerFilter implements Bro
             ConsumerControl control) {
         next.processConsumerControl(consumerExchange, control);
     }
+
+    public Scheduler getScheduler() {
+       return next.getScheduler();
+    }
+
+    public ThreadPoolExecutor getExecutor() {
+       return next.getExecutor();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu May 20 12:02:10 2010
@@ -29,6 +29,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.PostConstruct;
@@ -78,9 +81,10 @@ import org.apache.activemq.security.Secu
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterFactory;
-import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
@@ -188,9 +192,10 @@ public class BrokerService implements Se
     private IOExceptionHandler ioExceptionHandler;
     private boolean schedulerSupport = true;
     private File schedulerDirectoryFile;
-    
+    private Scheduler scheduler;
+    private ThreadPoolExecutor executor;
     private boolean slave = true;
-
+    
 	static {
         String localHostName = "localhost";
         try {
@@ -589,6 +594,15 @@ public class BrokerService implements Se
                 }
             }
         }
+        if (this.taskRunnerFactory != null) {
+            this.taskRunnerFactory.shutdown();
+        }
+        if (this.scheduler != null) {
+            this.scheduler.stop();
+        }
+        if (this.executor != null) {
+            this.executor.shutdownNow();
+        }
         LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
         synchronized (shutdownHooks) {
             for (Runnable hook : shutdownHooks) {
@@ -756,9 +770,6 @@ public class BrokerService implements Se
     }
 
     public PersistenceAdapterFactory getPersistenceFactory() {
-        if (persistenceFactory == null) {
-            persistenceFactory = createPersistenceFactory();
-        }
         return persistenceFactory;
     }
 
@@ -848,6 +859,7 @@ public class BrokerService implements Se
         try {
             if (systemUsage == null) {
                 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
+                systemUsage.setExecutor(getExecutor());
                 systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
                                                                          // 64
                                                                          // Meg
@@ -869,6 +881,9 @@ public class BrokerService implements Se
             removeService(this.systemUsage);
         }
         this.systemUsage = memoryManager;
+        if (this.systemUsage.getExecutor()==null) {
+            this.systemUsage.setExecutor(getExecutor());
+        }
         addService(this.systemUsage);
     }
 
@@ -953,11 +968,11 @@ public class BrokerService implements Se
     }
 
     public TaskRunnerFactory getTaskRunnerFactory() {
-        if (taskRunnerFactory == null) {
-            taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000,
+        if (this.taskRunnerFactory == null) {
+            this.taskRunnerFactory = new TaskRunnerFactory("BrokerService", getTaskRunnerPriority(), true, 1000,
                     isDedicatedTaskRunner());
         }
-        return taskRunnerFactory;
+        return this.taskRunnerFactory;
     }
 
     public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
@@ -1769,10 +1784,10 @@ public class BrokerService implements Se
         RegionBroker regionBroker;
         if (isUseJmx()) {
             regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
-                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
+                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
         } else {
             regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
-                    destinationInterceptor);
+                    destinationInterceptor,getScheduler(),getExecutor());
         }
         destinationFactory.setRegionBroker(regionBroker);
         regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
@@ -1850,20 +1865,20 @@ public class BrokerService implements Se
 
     protected PersistenceAdapter createPersistenceAdapter() throws IOException {
         if (isPersistent()) {
-            return getPersistenceFactory().createPersistenceAdapter();
+            PersistenceAdapterFactory fac = getPersistenceFactory();
+            if (fac != null) {
+                return fac.createPersistenceAdapter();
+            }else {
+                KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
+                File dir = new File(getBrokerDataDirectory(),"KahaDB");
+                adaptor.setDirectory(dir);
+                return adaptor;
+            }
         } else {
             return new MemoryPersistenceAdapter();
         }
     }
 
-    protected AMQPersistenceAdapterFactory createPersistenceFactory() {
-        AMQPersistenceAdapterFactory factory = new AMQPersistenceAdapterFactory();
-        factory.setDataDirectory(getBrokerDataDirectory());
-        factory.setTaskRunnerFactory(getPersistenceTaskRunnerFactory());
-        factory.setBrokerName(getBrokerName());
-        return factory;
-    }
-
     protected ObjectName createBrokerObjectName() throws IOException {
         try {
             return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
@@ -2124,6 +2139,31 @@ public class BrokerService implements Se
             }
         }
     }
+    
+    protected synchronized ThreadPoolExecutor getExecutor() {
+        if (this.executor == null) {
+        this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "Usage Async Task");
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+        }
+        return this.executor;
+    }
+    
+    protected synchronized Scheduler getScheduler() {
+        if (this.scheduler==null) {
+            this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
+            try {
+                this.scheduler.start();
+            } catch (Exception e) {
+               LOG.error("Failed to start Scheduler ",e);
+            }
+        }
+        return this.scheduler;
+    }
 
     public Broker getRegionBroker() {
         return regionBroker;
@@ -2251,7 +2291,5 @@ public class BrokerService implements Se
     
     public void setSchedulerDirectory(String schedulerDirectory) {
         setSchedulerDirectoryFile(new File(schedulerDirectory));
-    }
-    
-   
+    }   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Thu May 20 12:02:10 2010
@@ -20,6 +20,7 @@ import java.net.URI;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -41,6 +42,7 @@ import org.apache.activemq.command.Respo
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -283,4 +285,12 @@ public class EmptyBroker implements Brok
     public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
             ConsumerControl control) {     
     }
+
+    public Scheduler getScheduler() {
+        return null;
+    }
+
+    public ThreadPoolExecutor getExecutor() {
+        return null;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Thu May 20 12:02:10 2010
@@ -20,6 +20,7 @@ import java.net.URI;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -41,6 +42,7 @@ import org.apache.activemq.command.Respo
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -302,4 +304,12 @@ public class ErrorBroker implements Brok
             ConsumerControl control) {
         throw new BrokerStoppedException(this.message);
     }
+
+    public Scheduler getScheduler() {
+        throw new BrokerStoppedException(this.message);
+    }
+
+    public ThreadPoolExecutor getExecutor() {
+        throw new BrokerStoppedException(this.message);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Thu May 20 12:02:10 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.broker;
 import java.net.URI;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -41,6 +42,7 @@ import org.apache.activemq.command.Respo
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -312,4 +314,12 @@ public class MutableBrokerFilter impleme
         getNext().processConsumerControl(consumerExchange, control);
     }
 
+    public Scheduler getScheduler() {
+       return getNext().getScheduler();
+    }
+
+    public ThreadPoolExecutor getExecutor() {
+       return getNext().getExecutor();
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Thu May 20 12:02:10 2010
@@ -16,6 +16,28 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ThreadPoolExecutor;
+import javax.management.InstanceNotFoundException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -41,6 +63,7 @@ import org.apache.activemq.command.Subsc
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.JMXSupport;
@@ -48,27 +71,6 @@ import org.apache.activemq.util.ServiceS
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import javax.management.InstanceNotFoundException;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
 
 public class ManagedRegionBroker extends RegionBroker {
     private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class);
@@ -91,18 +93,20 @@ public class ManagedRegionBroker extends
     private Broker contextBroker;
 
     public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
-                               DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
-        super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor);
+                               DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
+        super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
         this.managementContext = context;
         this.brokerObjectName = brokerObjectName;
     }
 
+    @Override
     public void start() throws Exception {
         super.start();
         // build all existing durable subscriptions
         buildExistingSubscriptions();
     }
 
+    @Override
     protected void doStop(ServiceStopper stopper) {
         super.doStop(stopper);
         // lets remove any mbeans not yet removed
@@ -119,18 +123,22 @@ public class ManagedRegionBroker extends
         registeredMBeans.clear();
     }
 
+    @Override
     protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
+    @Override
     protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
+    @Override
     protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
+    @Override
     protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu May 20 12:02:10 2010
@@ -17,9 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-
 import javax.jms.ResourceAllocationException;
-
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Thu May 20 12:02:10 2010
@@ -55,6 +55,7 @@ public class DestinationFactoryImpl exte
         this.persistenceAdapter = persistenceAdapter;
     }
 
+    @Override
     public void setRegionBroker(RegionBroker broker) {
         if (broker == null) {
             throw new IllegalArgumentException("null broker");
@@ -62,6 +63,7 @@ public class DestinationFactoryImpl exte
         this.broker = broker;
     }
 
+    @Override
     public Set<ActiveMQDestination> getDestinations() {
         return persistenceAdapter.getDestinations();
     }
@@ -69,6 +71,7 @@ public class DestinationFactoryImpl exte
     /**
      * @return instance of {@link Queue} or {@link Topic}
      */
+    @Override
     public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception {
         if (destination.isQueue()) {
             if (destination.isTemporary()) {
@@ -100,6 +103,7 @@ public class DestinationFactoryImpl exte
         }
     }
 
+    @Override
     public void removeDestination(Destination dest) {
         ActiveMQDestination destination = dest.getActiveMQDestination();
         if (!destination.isTemporary()) {
@@ -131,11 +135,12 @@ public class DestinationFactoryImpl exte
         if (broker.getDestinationPolicy() != null) {
             PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
             if (entry != null) {
-                entry.configure(topic);
+                entry.configure(broker,topic);
             }
         }
     }
 
+    @Override
     public long getLastMessageBrokerSequenceId() throws IOException {
         return persistenceAdapter.getLastMessageBrokerSequenceId();
     }
@@ -144,6 +149,7 @@ public class DestinationFactoryImpl exte
         return persistenceAdapter;
     }
 
+    @Override
     public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException {
         return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu May 20 12:02:10 2010
@@ -23,10 +23,8 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -55,7 +53,7 @@ import org.apache.commons.logging.LogFac
 public abstract class PrefetchSubscription extends AbstractSubscription {
 
     private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
-    protected static final Scheduler scheduler = Scheduler.getInstance();
+    protected final Scheduler scheduler;
     
     protected PendingMessageCursor pending;
     protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
@@ -70,12 +68,13 @@ public abstract class PrefetchSubscripti
     private final Object pendingLock = new Object();
     private final Object dispatchLock = new Object();
     protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
-    private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
+    private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
     
     public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
         super(broker,context, info);
         this.usageManager=usageManager;
         pending = cursor;
+        this.scheduler = broker.getScheduler();
     }
 
     public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
@@ -230,6 +229,7 @@ public abstract class PrefetchSubscripti
                             context.getTransaction().addSynchronization(
                                     new Synchronization() {
 
+                                        @Override
                                         public void afterCommit()
                                                 throws Exception {
                                             synchronized(dispatchLock) {
@@ -239,6 +239,7 @@ public abstract class PrefetchSubscripti
                                             }
                                         }
 
+                                        @Override
                                         public void afterRollback() throws Exception {
                                             synchronized(dispatchLock) {
                                                 if (isSlave()) {
@@ -486,6 +487,7 @@ public abstract class PrefetchSubscripti
         return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
     }
 
+    @Override
     public int countBeforeFull() {
         return info.getPrefetchSize() + prefetchExtension - dispatched.size();
     }
@@ -510,6 +512,7 @@ public abstract class PrefetchSubscripti
         return enqueueCounter;
     }
 
+    @Override
     public boolean isRecoveryRequired() {
         return pending.isRecoveryRequired();
     }
@@ -526,13 +529,15 @@ public abstract class PrefetchSubscripti
         }
     }
 
-   public void add(ConnectionContext context, Destination destination) throws Exception {
+   @Override
+public void add(ConnectionContext context, Destination destination) throws Exception {
         synchronized(pendingLock) {
             super.add(context, destination);
             pending.add(context, destination);
         }
     }
 
+    @Override
     public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
         List<MessageReference> rc = new ArrayList<MessageReference>();
         synchronized(pendingLock) {
@@ -546,7 +551,7 @@ public abstract class PrefetchSubscripti
             synchronized(dispatchLock) {
 	            for (MessageReference r : dispatched) {
 	                if( r.getRegionDestination() == destination) {
-	                	rc.add((QueueMessageReference)r);
+	                	rc.add(r);
 	                }
 	            }
                 destination.getDestinationStatistics().getDispatched().subtract(dispatched.size());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu May 20 12:02:10 2010
@@ -125,7 +125,7 @@ public class Queue extends BaseDestinati
     };
     
     private final Object iteratingMutex = new Object() {};
-    private static final Scheduler scheduler = Scheduler.getInstance();
+    private final Scheduler scheduler;
     
     class TimeoutMessage implements Delayed {
 
@@ -203,6 +203,7 @@ public class Queue extends BaseDestinati
         super(brokerService, store, destination, parentStats);
         this.taskFactory = taskFactory;
         this.dispatchSelector = new QueueDispatchSelector(destination);
+        this.scheduler = brokerService.getBroker().getScheduler();
     }
 
     public List<Subscription> getConsumers() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu May 20 12:02:10 2010
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadPoolExecutor;
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import org.apache.activemq.broker.Broker;
@@ -57,6 +58,7 @@ import org.apache.activemq.command.Respo
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.BrokerSupport;
@@ -98,10 +100,14 @@ public class RegionBroker extends EmptyB
     private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
     private final DestinationInterceptor destinationInterceptor;
     private ConnectionContext adminConnectionContext;
+    private final Scheduler scheduler;
+    private final ThreadPoolExecutor executor;
 
     public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
-                        DestinationInterceptor destinationInterceptor) throws IOException {
+                        DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
         this.brokerService = brokerService;
+        this.executor=executor;
+        this.scheduler = scheduler;
         if (destinationFactory == null) {
             throw new IllegalArgumentException("null destinationFactory");
         }
@@ -810,6 +816,16 @@ public class RegionBroker extends EmptyB
         }
     }
     
+    
+    @Override
+    public Scheduler getScheduler() {
+        return this.scheduler;
+    }
+    
+    public ThreadPoolExecutor getExecutor() {
+        return this.executor;
+    }
+    
     @Override
     public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
         ActiveMQDestination destination = control.getDestination();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Thu May 20 12:02:10 2010
@@ -231,7 +231,7 @@ public class TopicRegion extends Abstrac
         if (broker.getDestinationPolicy() != null) {
             PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
             if (entry != null) {
-                entry.configure(topic);
+                entry.configure(broker,topic);
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java Thu May 20 12:02:10 2010
@@ -5,7 +5,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Subscription;
@@ -24,15 +23,19 @@ public class AbortSlowConsumerStrategy i
     
     private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class);
 
-    private static final Scheduler scheduler = Scheduler.getInstance();
-    private AtomicBoolean taskStarted = new AtomicBoolean(false);
-    private Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
+    private Scheduler scheduler;
+    private final AtomicBoolean taskStarted = new AtomicBoolean(false);
+    private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
 
     private long maxSlowCount = -1;
     private long maxSlowDuration = 30*1000;
     private long checkPeriod = 30*1000;
     private boolean abortConnection = false;
 
+   public void setScheduler(Scheduler s) {
+       this.scheduler=s;
+   }
+   
     public void slowConsumer(ConnectionContext context, Subscription subs) {
         if (maxSlowCount < 0 && maxSlowDuration < 0) {
             // nothing to do

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010
@@ -18,7 +18,7 @@ package org.apache.activemq.broker.regio
 
 import java.util.ArrayList;
 import java.util.List;
-
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.SubscriptionRecovery;
@@ -118,4 +118,7 @@ public class FixedCountSubscriptionRecov
         return result.toArray(new Message[result.size()]);
     }
 
+    public void setBroker(Broker broker) {        
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
 
 import java.util.Iterator;
 import java.util.List;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.SubscriptionRecovery;
@@ -109,6 +110,9 @@ public class FixedSizedSubscriptionRecov
     public Message[] browse(ActiveMQDestination destination) throws Exception {
         return buffer.browse(destination);
     }
+    
+    public void setBroker(Broker broker) {        
+    }
 
     // Implementation methods
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.SubscriptionRecovery;
@@ -68,5 +69,8 @@ public class LastImageSubscriptionRecove
     public SubscriptionRecoveryPolicy copy() {
         return new LastImageSubscriptionRecoveryPolicy();
     }
+    
+    public void setBroker(Broker broker) {        
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.SubscriptionRecovery;
@@ -52,5 +53,8 @@ public class NoSubscriptionRecoveryPolic
     public Message[] browse(ActiveMQDestination dest) throws Exception {
         return new Message[0];
     }
+    
+    public void setBroker(Broker broker) {        
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Thu May 20 12:02:10 2010
@@ -90,7 +90,7 @@ public class PolicyEntry extends Destina
     
    
     public void configure(Broker broker,Queue queue) {
-        baseConfiguration(queue);
+        baseConfiguration(broker,queue);
         if (dispatchPolicy != null) {
             queue.setDispatchPolicy(dispatchPolicy);
         }
@@ -112,14 +112,16 @@ public class PolicyEntry extends Destina
         queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
     }
 
-    public void configure(Topic topic) {
-        baseConfiguration(topic);
+    public void configure(Broker broker,Topic topic) {
+        baseConfiguration(broker,topic);
         if (dispatchPolicy != null) {
             topic.setDispatchPolicy(dispatchPolicy);
         }
         topic.setDeadLetterStrategy(getDeadLetterStrategy());
         if (subscriptionRecoveryPolicy != null) {
-            topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
+            SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy();
+            srp.setBroker(broker);
+            topic.setSubscriptionRecoveryPolicy(srp);
         }
         if (memoryLimit > 0) {
             topic.getMemoryUsage().setLimit(memoryLimit);
@@ -127,7 +129,7 @@ public class PolicyEntry extends Destina
         topic.setLazyDispatch(isLazyDispatch());
     }
     
-    public void baseConfiguration(BaseDestination destination) {
+    public void baseConfiguration(Broker broker,BaseDestination destination) {
         destination.setProducerFlowControl(isProducerFlowControl());
         destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
         destination.setEnableAudit(isEnableAudit());
@@ -148,7 +150,11 @@ public class PolicyEntry extends Destina
         destination.setMaxExpirePageSize(getMaxExpirePageSize());
         destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
         destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
-        destination.setSlowConsumerStrategy(getSlowConsumerStrategy());
+        SlowConsumerStrategy scs = getSlowConsumerStrategy();
+        if (scs != null) {
+            scs.setScheduler(broker.getScheduler());
+        }
+        destination.setSlowConsumerStrategy(scs);
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010
@@ -17,12 +17,11 @@
 package org.apache.activemq.broker.region.policy;
 
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
-
 import org.apache.activemq.ActiveMQMessageTransformation;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -50,9 +49,9 @@ public class QueryBasedSubscriptionRecov
     private static final Log LOG = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class);
 
     private MessageQuery query;
-    private AtomicLong messageSequence = new AtomicLong(0);
-    private IdGenerator idGenerator = new IdGenerator();
-    private ProducerId producerId = createProducerId();
+    private final AtomicLong messageSequence = new AtomicLong(0);
+    private final IdGenerator idGenerator = new IdGenerator();
+    private final ProducerId producerId = createProducerId();
 
     public SubscriptionRecoveryPolicy copy() {
         QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy();
@@ -99,6 +98,9 @@ public class QueryBasedSubscriptionRecov
     public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Exception {
         return new org.apache.activemq.command.Message[0];
     }
+    
+    public void setBroker(Broker broker) {        
+    }
 
     protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) {
         try {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java Thu May 20 12:02:10 2010
@@ -2,6 +2,7 @@ package org.apache.activemq.broker.regio
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.thread.Scheduler;
 
 /*
  * a strategy for dealing with slow consumers
@@ -9,5 +10,6 @@ import org.apache.activemq.broker.region
 public interface SlowConsumerStrategy {
 
     void slowConsumer(ConnectionContext context, Subscription subs);
+    void setScheduler(Scheduler scheduler);
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
 
 
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.SubscriptionRecovery;
@@ -69,4 +70,6 @@ public interface SubscriptionRecoveryPol
      * @return the copy
      */
     SubscriptionRecoveryPolicy copy();
+    
+    void setBroker(Broker broker);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java Thu May 20 12:02:10 2010
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.SubscriptionRecovery;
@@ -28,7 +29,6 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.DestinationFilter;
-import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.thread.Scheduler;
 
 /**
@@ -42,7 +42,7 @@ import org.apache.activemq.thread.Schedu
 public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
 
     private static final int GC_INTERVAL = 1000;
-    protected static final Scheduler scheduler = Scheduler.getInstance();
+    private Scheduler scheduler;
     
     // TODO: need to get a better synchronized linked list that has little
     // contention between enqueuing and dequeuing
@@ -89,6 +89,10 @@ public class TimedSubscriptionRecoveryPo
             }
         }
     }
+    
+    public void setBroker(Broker broker) {  
+        this.scheduler = broker.getScheduler();
+    }
 
     public void start() throws Exception {
         scheduler.executePeriodically(gcTask, GC_INTERVAL);
@@ -97,6 +101,7 @@ public class TimedSubscriptionRecoveryPo
     public void stop() throws Exception {
         scheduler.cancel(gcTask);
     }
+    
 
     public void gc() {
         lastGCRun = System.currentTimeMillis();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Thu May 20 12:02:10 2010
@@ -35,7 +35,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
 import org.apache.activemq.thread.Scheduler;
@@ -75,7 +74,7 @@ public class AsyncDataManager {
     public static final int PREFERED_DIFF = 1024 * 512;
 
     private static final Log LOG = LogFactory.getLog(AsyncDataManager.class);
-    protected static Scheduler scheduler  = Scheduler.getInstance();
+    protected Scheduler scheduler;
 
     protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
 
@@ -193,7 +192,13 @@ public class AsyncDataManager {
                 cleanup();
             }
         };
-        scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
+        this.scheduler = new Scheduler("AsyncDataManager Scheduler");
+        try {
+            this.scheduler.start();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        this.scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
     }
 
     public void lock() throws IOException {
@@ -328,7 +333,12 @@ public class AsyncDataManager {
         if (!started) {
             return;
         }
-        scheduler.cancel(cleanupTask);
+        this.scheduler.cancel(cleanupTask);
+        try {
+            this.scheduler.stop();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
         accessorPool.close();
         storeState(false);
         appender.close();
@@ -376,7 +386,7 @@ public class AsyncDataManager {
     public synchronized void addInterestInFile(int file) throws IOException {
         if (file >= 0) {
             Integer key = Integer.valueOf(file);
-            DataFile dataFile = (DataFile)fileMap.get(key);
+            DataFile dataFile = fileMap.get(key);
             if (dataFile == null) {
                 throw new IOException("That data file does not exist");
             }
@@ -393,7 +403,7 @@ public class AsyncDataManager {
     public synchronized void removeInterestInFile(int file) throws IOException {
         if (file >= 0) {
             Integer key = Integer.valueOf(file);
-            DataFile dataFile = (DataFile)fileMap.get(key);
+            DataFile dataFile = fileMap.get(key);
             removeInterestInFile(dataFile);
         }
        
@@ -414,7 +424,7 @@ public class AsyncDataManager {
                 
         List<DataFile> purgeList = new ArrayList<DataFile>();
         for (Integer key : unUsed) {
-            DataFile dataFile = (DataFile)fileMap.get(key);
+            DataFile dataFile = fileMap.get(key);
             purgeList.add(dataFile);
         }
         for (DataFile dataFile : purgeList) {
@@ -432,7 +442,7 @@ public class AsyncDataManager {
         for (Integer key : unUsed) {
         	// Only add files less than the lastFile..
         	if( key.intValue() < lastFile.intValue() ) {
-                DataFile dataFile = (DataFile)fileMap.get(key);
+                DataFile dataFile = fileMap.get(key);
                 purgeList.add(dataFile);
         	}
         }
@@ -499,6 +509,7 @@ public class AsyncDataManager {
         this.maxFileLength = maxFileLength;
     }
 
+    @Override
     public String toString() {
         return "DataManager:(" + filePrefix + ")";
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=946600&r1=946599&r2=946600&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Thu May 20 12:02:10 2010
@@ -30,7 +30,6 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.activeio.journal.Journal;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -58,7 +57,6 @@ import org.apache.activemq.store.TopicMe
 import org.apache.activemq.store.TopicReferenceStore;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
-import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
@@ -85,7 +83,7 @@ import org.apache.commons.logging.LogFac
 public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
 
     private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
-    private static final Scheduler scheduler = Scheduler.getInstance();
+    private Scheduler scheduler;
     private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
     private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
     private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
@@ -99,7 +97,7 @@ public class AMQPersistenceAdapter imple
     private SystemUsage usageManager;
     private long checkpointInterval = 1000 * 20;
     private int maxCheckpointMessageAddSize = 1024 * 4;
-    private AMQTransactionStore transactionStore = new AMQTransactionStore(this);
+    private final AMQTransactionStore transactionStore = new AMQTransactionStore(this);
     private TaskRunner checkpointTask;
     private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
     private final AtomicBoolean started = new AtomicBoolean(false);
@@ -112,7 +110,7 @@ public class AMQPersistenceAdapter imple
     private File directory;
     private File directoryArchive;
     private BrokerService brokerService;
-    private AtomicLong storeSize = new AtomicLong();
+    private final AtomicLong storeSize = new AtomicLong();
     private boolean persistentIndex=true;
     private boolean useNio = true;
     private boolean archiveDataLogs=false;
@@ -124,8 +122,7 @@ public class AMQPersistenceAdapter imple
     private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
     private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
     private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
-    private Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
-    private String directoryPath = "";
+    private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
     private RandomAccessFile lockFile;
     private FileLock lock;
     private boolean disableLocking = DISABLE_LOCKING;
@@ -134,6 +131,8 @@ public class AMQPersistenceAdapter imple
     private boolean lockAquired;
     private boolean recoverReferenceStore=true;
     private boolean forceRecoverReferenceStore=false;
+    private boolean useDedicatedTaskRunner=false;
+    private int journalThreadPriority = Thread.MAX_PRIORITY;
 
     public String getBrokerName() {
         return this.brokerName;
@@ -165,12 +164,19 @@ public class AMQPersistenceAdapter imple
             } else {
                 this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
                 this.directory = new File(directory, "amqstore");
-                this.directoryPath=directory.getAbsolutePath();
+                directory.getAbsolutePath();
             }
         }
         if (this.directoryArchive == null) {
             this.directoryArchive = new File(this.directory,"archive");
         }
+        if (this.brokerService != null) {
+            this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory();
+        }else {
+            this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler");
+        }
+        this.taskRunnerFactory= new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(),
+                true, 1000, isUseDedicatedTaskRunner());
         IOHelper.mkdirs(this.directory);
         lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
         lock();
@@ -192,10 +198,11 @@ public class AMQPersistenceAdapter imple
         referenceStoreAdapter.setBrokerName(getBrokerName());
         referenceStoreAdapter.setUsageManager(usageManager);
         referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
-        if (taskRunnerFactory == null) {
-            taskRunnerFactory = createTaskRunnerFactory();
-        }
         
+        if (brokerService != null) {
+            this.scheduler = this.brokerService.getBroker().getScheduler();
+        }
+                
         if (failIfJournalIsLocked) {
             asyncDataManager.lock();
         } else {
@@ -389,7 +396,7 @@ public class AMQPersistenceAdapter imple
             Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
             while (queueIterator.hasNext()) {
                 final AMQMessageStore ms = queueIterator.next();
-                Location mark = (Location)ms.getMark();
+                Location mark = ms.getMark();
                 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
                     newMark = mark;
                 }
@@ -397,7 +404,7 @@ public class AMQPersistenceAdapter imple
             Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
             while (topicIterator.hasNext()) {
                 final AMQTopicMessageStore ms = topicIterator.next();
-                Location mark = (Location)ms.getMark();
+                Location mark = ms.getMark();
                 if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
                     newMark = mark;
                 }
@@ -726,6 +733,7 @@ public class AMQPersistenceAdapter imple
         deleteAllMessages = true;
     }
 
+    @Override
     public String toString() {
         return "AMQPersistenceAdapter(" + directory + ")";
     }
@@ -754,10 +762,6 @@ public class AMQPersistenceAdapter imple
         return adaptor;
     }
 
-    protected TaskRunnerFactory createTaskRunnerFactory() {
-        return DefaultThreadPools.getDefaultTaskRunnerFactory();
-    }
-
     // /////////////////////////////////////////////////////////////////
     // Property Accessors
     // /////////////////////////////////////////////////////////////////
@@ -991,6 +995,28 @@ public class AMQPersistenceAdapter imple
     public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
         this.forceRecoverReferenceStore = forceRecoverReferenceStore;
     }
+    
+    public boolean isUseDedicatedTaskRunner() {
+        return useDedicatedTaskRunner;
+    }
+    
+    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
+        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
+    }
+    
+    /**
+     * @return the journalThreadPriority
+     */
+    public int getJournalThreadPriority() {
+        return this.journalThreadPriority;
+    }
+
+    /**
+     * @param journalThreadPriority the journalThreadPriority to set
+     */
+    public void setJournalThreadPriority(int journalThreadPriority) {
+        this.journalThreadPriority = journalThreadPriority;
+    }
 
 	
 	protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {