You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/28 07:54:58 UTC

svn commit: r397774 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/memory/ main/java/org/apache/activemq/store/journal/ main/java/org/apache/activemq/...

Author: chirino
Date: Thu Apr 27 22:54:54 2006
New Revision: 397774

URL: http://svn.apache.org/viewcvs?rev=397774&view=rev
Log:
Since some OS/JVM combinations handle threads more efficiently than others, using a thread pool to run our async tasks may not
be the most optimal solution.  Modified the TaskRunnerFactory so that it uses a system property to choose between the 
PooledTaskRunner or the DedicatedTaskRunner which now keeps a dedicated thread per task.

The default is still set to use the PooledTaskRunner, but we may change this if performance benchmarks indicate that DedicatedTaskRunner should be the default.

Also make the thread names a little more uniform so that when you use a debugger you can easily tell what each thread is doing.



Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
Removed:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/SimpleTaskRunner.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Apr 27 22:54:54 2006
@@ -87,7 +87,6 @@
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
 import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
 import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
@@ -97,8 +96,8 @@
 
 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable,  StreamConnection, TransportListener {
 
-    public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
-    private final Executor asyncConnectionThread;
+    public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("ActiveMQ Session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
+    private final ThreadPoolExecutor asyncConnectionThread;
 
     private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
     private static final IdGenerator connectionIdGenerator = new IdGenerator();
@@ -168,16 +167,17 @@
      * @param password
      * @throws Exception 
      */
-    protected ActiveMQConnection(Transport transport, JMSStatsImpl factoryStats)
+    protected ActiveMQConnection(final Transport transport, JMSStatsImpl factoryStats)
             throws Exception {
        
-	// Configure a single threaded executor who's core thread can timeout if idle
+        // Configure a single threaded executor who's core thread can timeout if idle
         asyncConnectionThread = new ThreadPoolExecutor(1,1,5,TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
             public Thread newThread(Runnable r) {
-                return new Thread(r, "Connection task");
+                Thread thread = new Thread(r, "AcitveMQ Connection Worker: "+transport);
+                thread.setDaemon(true);
+                return thread;
             }});
-        // Todo: see if we can allow the core threads to timeout.
-        // asyncConnectionThread.allowCoreThreadTimeOut(true);
+        asyncConnectionThread.allowCoreThreadTimeOut(true);
         
         this.info = new ConnectionInfo(new ConnectionId(connectionIdGenerator.generateId()));
         this.info.setManageable(true);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Apr 27 22:54:54 2006
@@ -535,6 +535,8 @@
 
     synchronized public void dispose() throws JMSException {
         if (!closed) {
+
+            executor.stop();
             
             for (Iterator iter = consumers.iterator(); iter.hasNext();) {
                 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) iter.next();
@@ -559,6 +561,7 @@
             connection.removeSession(this);
             this.transactionContext=null;
             closed = true;
+            
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java Thu Apr 27 22:54:54 2006
@@ -61,7 +61,7 @@
     }
 
     private void wakeup() {
-        if( !dispatchedBySessionPool && hasUncomsumedMessages() ) {
+        if( taskRunner!=null && !dispatchedBySessionPool && hasUncomsumedMessages() ) {
             try {
                 taskRunner.wakeup();
             } catch (InterruptedException e) {
@@ -101,7 +101,7 @@
     synchronized void start() {
         if( !messageQueue.isRunning() ) {
             messageQueue.start();
-            taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this);
+            taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId());
             wakeup();
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Thu Apr 27 22:54:54 2006
@@ -122,7 +122,7 @@
         }
         
         if( taskRunnerFactory != null ) {
-            taskRunner = taskRunnerFactory.createTaskRunner( this );
+            taskRunner = taskRunnerFactory.createTaskRunner( this, "ActiveMQ Connection Dispatcher: "+System.identityHashCode(this) );
         }
         else { 
             taskRunner = null;
@@ -145,6 +145,10 @@
         if(disposed)
             return;
         disposed=true;
+        
+        if( taskRunner!=null )
+            taskRunner.shutdown();
+        
         //
         // Remove all logical connection associated with this connection
         // from the broker.

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Thu Apr 27 22:54:54 2006
@@ -300,4 +300,7 @@
         this.name = name;
     }
 
+    public String toString() {
+        return getName();
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportStatusDetector.java Thu Apr 27 22:54:54 2006
@@ -103,7 +103,7 @@
     }
     public void start() throws Exception{
         if(started.compareAndSet(false,true)){
-            runner=new Thread(this,"Transport Status Dector "+connector);
+            runner=new Thread(this,"ActiveMQ Transport Status Monitor: "+connector);
             runner.setDaemon(true);
             runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
             runner.start();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java Thu Apr 27 22:54:54 2006
@@ -46,7 +46,7 @@
             public boolean iterate() {
                 return evictMessages();
             }
-        });
+        }, "Cache Evictor: "+System.identityHashCode(this));
     }
     
     private boolean evictMessages() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu Apr 27 22:54:54 2006
@@ -120,7 +120,7 @@
             public boolean iterate() {
                 return doCheckpoint();
             }
-        });
+        }, "ActiveMQ Journal Checkpoint Worker");
 
         this.longTermPersistence = longTermPersistence;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java Thu Apr 27 22:54:54 2006
@@ -120,7 +120,7 @@
             public boolean iterate() {
                 return doCheckpoint();
             }
-        });
+        }, "ActiveMQ Checkpoint Worker");
 
         this.longTermPersistence = longTermPersistence;
     }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java?rev=397774&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java Thu Apr 27 22:54:54 2006
@@ -0,0 +1,109 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.thread;
+
+
+/**
+ *
+ * @version $Revision: 1.1 $
+ */
+class DedicatedTaskRunner implements TaskRunner {
+
+    private final Task task;
+    private final Thread thread;
+    
+    private final Object mutex = new Object();
+    private boolean threadTerminated;
+    private boolean pending;
+    private boolean shutdown;
+    
+    public DedicatedTaskRunner(Task task, String name, int priority, boolean daemon) {
+        this.task = task;
+        thread = new Thread(name) {
+            public void run() {
+                runTask();
+            }
+        };
+        thread.setDaemon(daemon);
+        thread.setName(name);
+        thread.setPriority(priority);
+        thread.start();
+    }
+    
+    /**
+     */
+    public void wakeup() throws InterruptedException {
+        synchronized( mutex ) {
+            if( shutdown )
+                return;
+            pending=true;            
+            mutex.notifyAll();
+        }
+    }
+
+    /**
+     * shut down the task
+     * @throws InterruptedException 
+     */
+    public void shutdown() throws InterruptedException{
+        synchronized(mutex){
+            shutdown=true;
+            pending=true;
+            mutex.notifyAll();
+
+            // Wait till the thread stops.
+            if(!threadTerminated){
+                mutex.wait();
+            }
+        }
+    }        
+    
+    private void runTask() {
+        
+        try {
+            while( true ) {
+             
+                synchronized (mutex) {   
+                    pending=false;
+                    if( shutdown ) {
+                        return;
+                    }
+                }
+                
+                if( !task.iterate() ) {
+                    // wait to be notified.
+                    synchronized (mutex) {
+                        while( !pending ) {
+                            mutex.wait();
+                        }
+                    }
+                }
+                
+            }
+            
+        } catch (InterruptedException e) {
+            // Someone really wants this thread to die off.
+        } finally {
+            // Make sure we notify any waiting threads that thread 
+            // has terminated.
+            synchronized (mutex) {
+                threadTerminated=true;
+                mutex.notifyAll();
+            }            
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java Thu Apr 27 22:54:54 2006
@@ -37,7 +37,7 @@
         });
     }
     
-    private static final TaskRunnerFactory defaultTaskRunnerFactory = new TaskRunnerFactory(defaultPool,10);
+    private static final TaskRunnerFactory defaultTaskRunnerFactory = new TaskRunnerFactory();
     
     public static Executor getDefaultPool() {
         return defaultPool;

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java?rev=397774&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java Thu Apr 27 22:54:54 2006
@@ -0,0 +1,134 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.thread;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+
+/**
+ *
+ * @version $Revision: 1.1 $
+ */
+class PooledTaskRunner implements TaskRunner {
+
+    private final int maxIterationsPerRun;
+    private final Executor executor;
+    private final Task task;
+    private final Runnable runable;
+    private boolean queued;
+    private boolean shutdown;
+    private boolean iterating;
+    private Thread runningThread;
+    
+    public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) {
+        this.executor = executor;
+        this.maxIterationsPerRun = maxIterationsPerRun;
+        this.task = task;
+        runable = new Runnable() {
+            public void run() {
+                runningThread = Thread.currentThread();
+                runTask();
+                runningThread = null;
+            }
+        };
+    }
+    
+
+
+    /**
+     * We Expect MANY wakeup calls on the same TaskRunner.
+     */
+    public void wakeup() throws InterruptedException {
+        synchronized( runable ) {
+            
+            // When we get in here, we make some assumptions of state:
+            // queued=false, iterating=false: wakeup() has not be called and therefore task is not executing.
+            // queued=true,  iterating=false: wakeup() was called but, task execution has not started yet 
+            // queued=false, iterating=true : wakeup() was called, which caused task execution to start.
+            // queued=true,  iterating=true : wakeup() called after task execution was started. 
+            
+            if( queued || shutdown )
+                return;
+            
+            queued=true;
+            
+            // The runTask() method will do this for me once we are done iterating.
+            if( !iterating ) {
+                executor.execute(runable);
+            }
+        }
+    }
+
+    /**
+     * shut down the task
+     * @throws InterruptedException 
+     */
+    public void shutdown() throws InterruptedException{
+        synchronized(runable){
+            shutdown=true;
+            //the check on the thread is done
+            //because a call to iterate can result in
+            //shutDown() being called, which would wait forever
+            //waiting for iterating to finish
+            if(runningThread!=Thread.currentThread()){
+                while(iterating==true){
+                    runable.wait();
+                }
+            }
+        }
+    }        
+    
+    private void runTask() {
+        
+        synchronized (runable) {
+            queued = false;
+            if( shutdown ) {
+                iterating = false;
+                runable.notifyAll();
+                return;
+            }
+            iterating = true;
+        }
+        
+        // Don't synchronize while we are iterating so that 
+        // multiple wakeup() calls can be executed concurrently.
+        boolean done=false;
+        for (int i = 0; i < maxIterationsPerRun; i++) {
+            if( !task.iterate() ) {
+                done=true;
+                break;
+            }
+        }
+        
+        synchronized (runable) {
+            iterating=false;
+            if( shutdown ) {
+                queued=false;
+                runable.notifyAll();
+                return;
+            }
+            
+            // If we could not iterate all the items
+            // then we need to re-queue.
+            if( !done )
+                queued = true;    
+            
+            if( queued ) {
+                executor.execute(runable);
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Thu Apr 27 22:54:54 2006
@@ -37,43 +37,42 @@
 public class TaskRunnerFactory {
 
     private Executor executor;
-    private int maxIterationsPerRun = 1000;
+    private int maxIterationsPerRun;
+    private String name;
+    private int priority;
+    private boolean daemon;
 
     public TaskRunnerFactory() {
-        setExecutor(createDefaultExecutor("ActiveMQ Task", Thread.NORM_PRIORITY, true));
+        this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
     }
 
     public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
+        
+        this.name = name;
+        this.priority = priority;
+        this.daemon = daemon;
         this.maxIterationsPerRun = maxIterationsPerRun;
-        setExecutor(createDefaultExecutor(name, priority, daemon));
-    }
-
-    public TaskRunnerFactory(Executor executor, int maxIterationsPerRun) {
-        this.executor = executor;
-        this.maxIterationsPerRun = maxIterationsPerRun;
-    }
-
-    public TaskRunner createTaskRunner(Task task) {
-        return new SimpleTaskRunner(executor, task, maxIterationsPerRun);
-    }
+        
+        // If your OS/JVM combination has a good thread model, you may want to avoid 
+        // using a thread pool to run tasks and use a DedicatedTaskRunner instead.
+        if( "true".equals(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner")) ) {
+            executor = null;
+        } else {
+            executor = createDefaultExecutor();
+        }
     
-    public Executor getExecutor() {
-        return executor;
     }
 
-    public void setExecutor(Executor executor) {
-        this.executor = executor;
+    public TaskRunner createTaskRunner(Task task, String name) {
+        if( executor!=null ) {
+            return new PooledTaskRunner(executor, task, maxIterationsPerRun);
+        } else {
+            return new DedicatedTaskRunner(task, name, priority, daemon);
+        }
     }
-
-    public int getMaxIterationsPerRun() {
-        return maxIterationsPerRun;
-    }
-
-    public void setMaxIterationsPerRun(int maxIterationsPerRun) {
-        this.maxIterationsPerRun = maxIterationsPerRun;
-    }
-
-    protected Executor createDefaultExecutor(final String name, final int priority, final boolean daemon) {
+    
+    protected Executor createDefaultExecutor() {
+        
         ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
                 Thread thread = new Thread(runnable, name);
@@ -84,6 +83,7 @@
         });
         rc.allowCoreThreadTimeOut(true);
         return rc;
+            
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Thu Apr 27 22:54:54 2006
@@ -69,7 +69,7 @@
 
     protected void doStart() throws Exception {
         log.info("Listening for connections at: " + getLocation());
-        runner = new Thread(this, toString());
+        runner = new Thread(this, "ActiveMQ Transport Server: "+toString());
         runner.setDaemon(daemon);
         runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);
         runner.start();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java Thu Apr 27 22:54:54 2006
@@ -38,7 +38,7 @@
     }
 
     protected void doStart() throws Exception {
-        runner = new Thread(this, toString());
+        runner = new Thread(this, "ActiveMQ Transport: "+toString());
         runner.setDaemon(daemon);
         runner.start();
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Apr 27 22:54:54 2006
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Random;
+
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
@@ -30,7 +31,6 @@
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.transport.CompositeTransport;
-import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
@@ -40,6 +40,7 @@
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
@@ -214,7 +215,7 @@
                 return !disposed;
             }
 
-        });
+        }, "ActiveMQ Failover Worker: "+System.identityHashCode(this));
     }
 
     private void handleTransportFailure(IOException e) throws InterruptedException {
@@ -262,6 +263,7 @@
         synchronized(sleepMutex){
             sleepMutex.notifyAll();
         }
+        reconnectTask.shutdown();
     }
 
     public long getInitialReconnectDelay() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Apr 27 22:54:54 2006
@@ -152,7 +152,7 @@
             public boolean iterate() {
                 return doConnect();
             }
-        });
+        }, "ActiveMQ Fanout Worker: "+System.identityHashCode(this));
     }
     
     /**
@@ -294,7 +294,6 @@
             log.debug("Stopped: "+this);
             ss.throwFirstException();
         }
-
         reconnectTask.shutdown();
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Thu Apr 27 22:54:54 2006
@@ -156,7 +156,7 @@
      * @return pretty print of this
      */
     public String toString() {
-        return "TcpTransportServer@" + getLocation();
+        return ""+getLocation();
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java?rev=397774&r1=397773&r2=397774&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/thread/TaskRunnerTest.java Thu Apr 27 22:54:54 2006
@@ -32,6 +32,17 @@
     private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
             .getLog(TaskRunnerTest.class);
 
+    
+    public void testWakeupPooled() throws InterruptedException, BrokenBarrierException {
+        System.setProperty("org.apache.activemq.UseDedicatedTaskRunner", "false");
+        doTestWakeup();
+    }
+    
+    public void testWakeupDedicated() throws InterruptedException, BrokenBarrierException {
+        System.setProperty("org.apache.activemq.UseDedicatedTaskRunner", "true");
+        doTestWakeup();
+    }
+    
     /**
      * Simulate multiple threads queuing work for the
      * TaskRunner.  The Task Runner dequeues the 
@@ -40,7 +51,7 @@
      * @throws InterruptedException
      * @throws BrokenBarrierException 
      */
-    public void testWakeup() throws InterruptedException, BrokenBarrierException {
+    public void doTestWakeup() throws InterruptedException, BrokenBarrierException {
         
         final AtomicInteger iterations = new AtomicInteger(0);
         final AtomicInteger counter = new AtomicInteger(0);
@@ -64,7 +75,7 @@
                     return true;
                 }
             }
-        });
+        }, "Thread Name");
         
         long start = System.currentTimeMillis();
         final int WORKER_COUNT=5;
@@ -96,6 +107,8 @@
         log.info("Dequeues/s: "+(1000.0*ENQUEUE_COUNT/(end-start)));
         log.info("duration: "+((end-start)/1000.0));
         assertTrue(b);
+        
+        runner.shutdown();
     }