You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/06 17:28:36 UTC

svn commit: r887707 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/...

Author: chirino
Date: Sun Dec  6 16:28:35 2009
New Revision: 887707

URL: http://svn.apache.org/viewvc?rev=887707&view=rev
Log:
Dispatcher is now using the Retained interface to manage it's lifecycle.


Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Sun Dec  6 16:28:35 2009
@@ -138,7 +138,7 @@
     		throw new IllegalStateException("Can only start a broker that is in the "+State.CONFIGURATION +" state.  Broker was "+state.get());
     	}
     	try {
-		    dispatcher.start();
+		    dispatcher.retain();
 
 	    	synchronized(virtualHosts) {
 			    for (VirtualHost virtualHost : virtualHosts.values()) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Sun Dec  6 16:28:35 2009
@@ -97,7 +97,7 @@
     @Before
     public void setUp() throws Exception {
         dispatcher = createDispatcher();
-        dispatcher.start();
+        dispatcher.retain();
         
         if (tcp) {
             sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Sun Dec  6 16:28:35 2009
@@ -64,7 +64,7 @@
     
     protected void startServices() throws Exception {
         dispatcher = createDispatcher();
-        dispatcher.start();
+        dispatcher.retain();
         database = new BrokerDatabase(createStore());
         database.setDispatcher(dispatcher);
         database.start();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java Sun Dec  6 16:28:35 2009
@@ -22,9 +22,6 @@
 
 public interface Dispatcher extends Retained {
     
-    public void start();
-    public void shutdown(Runnable onShutdown);
-    
     public DispatchQueue getGlobalQueue();
     public DispatchQueue getGlobalQueue(DispatchPriority priority);
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java Sun Dec  6 16:28:35 2009
@@ -24,6 +24,6 @@
     
     public void retain();
     public void release();
-    public void setShutdownHandler(Runnable shutdownHandler);
+    public void addShutdownWatcher(Runnable shutdownWatcher);
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java Sun Dec  6 16:28:35 2009
@@ -1,28 +1,49 @@
 package org.apache.activemq.dispatch.internal;
 
+import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class BaseRetained {
     
-    final protected AtomicInteger reatinCounter = new AtomicInteger(1);
-    final protected AtomicReference<Runnable> shutdownHandler = new AtomicReference<Runnable>();
+    final protected AtomicInteger reatinCounter = new AtomicInteger(0);
+    final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>();
 
-    public void setShutdownHandler(Runnable finalizer) {
-        this.shutdownHandler.set(finalizer);
+    public void addShutdownWatcher(Runnable shutdownHandler) {
+        synchronized(shutdownHandlers) {
+            shutdownHandlers.add(shutdownHandler);
+        }
     }
     
     public void retain() {
-        int prev = reatinCounter.getAndIncrement();
-        assert prev!=0;
+        if( reatinCounter.getAndIncrement() == 0 ) {
+            startup();
+        }
     }
 
     public void release() {
         if( reatinCounter.decrementAndGet()==0 ) {
-            Runnable value = shutdownHandler.getAndSet(null);
-            if( value!=null ) {
-                value.run();
-            }
+            shutdown();
+        }
+    }
+
+    /**
+     * Subclasses should override if they want to do some startup processing. 
+     */
+    protected void startup() {
+    }
+
+
+    /**
+     * Subclasses should override if they want to do clean up. 
+     */
+    protected void shutdown() {
+        ArrayList<Runnable> copy;
+        synchronized(shutdownHandlers) {
+            copy = new ArrayList<Runnable>(shutdownHandlers);
+            shutdownHandlers.clear();
+        }
+        for (Runnable runnable : copy) {
+            runnable.run();
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java Sun Dec  6 16:28:35 2009
@@ -22,12 +22,6 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.DispatcherFactory;
-
-import static org.apache.activemq.dispatch.DispatcherFactory.*;
-
-import static org.apache.activemq.dispatch.DispatcherFactory.*;
 
 /**
  * 
@@ -44,6 +38,7 @@
 
     public SerialDispatchQueue(String label) {
         this.label = label;
+        retain();
     }
 
     public String getLabel() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java Sun Dec  6 16:28:35 2009
@@ -34,7 +34,7 @@
 import static org.apache.activemq.dispatch.DispatchPriority.*;
 
 
-public class AdvancedDispatcher extends BaseRetained implements Dispatcher {
+final public class AdvancedDispatcher extends BaseRetained implements Dispatcher {
 
     public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
 
@@ -44,10 +44,6 @@
 
     private final ArrayList<DispatcherThread> dispatchers = new ArrayList<DispatcherThread>();
 
-    final AtomicInteger startCounter = new AtomicInteger();
-//    final AtomicBoolean started = new AtomicBoolean();
-//    final AtomicBoolean shutdown = new AtomicBoolean();
-
     private int roundRobinCounter = 0;
     private int size;
     private final int numPriorities;
@@ -67,54 +63,32 @@
     }
 
     /**
-     * Subclasses should implement this to return a new dispatcher.
-     * 
-     * @param name
-     *            The name to assign the dispatcher.
-     * @param pool
-     *            The pool.
-     * @return The new dispathcer.
-     */
-    protected DispatcherThread createDispatcher(String name) throws Exception {
-        return new DispatcherThread(this, name, numPriorities);
-    }
-
-    /**
      * @see org.apache.activemq.dispatch.internal.advanced.DispatcherThread#start()
      */
-    public synchronized final void start()  {
-        if( startCounter.getAndIncrement()==0 ) {
-            // Create all the workers.
-            try {
-                loadBalancer.start();
-                for (int i = 0; i < size; i++) {
-                    DispatcherThread dispatacher = createDispatcher("dispatcher -" + (i + 1));
-                    dispatchers.add(dispatacher);
-                    dispatacher.start();
-                }
-            } catch (Exception e) {
-                shutdown();
-            }
+    protected void startup()  {
+        loadBalancer.start();
+        for (int i = 0; i < size; i++) {
+            DispatcherThread dispatacher = new DispatcherThread(this, ("dispatcher -" + (i + 1)), numPriorities);
+            dispatchers.add(dispatacher);
+            dispatacher.start();
         }
     }
-
-    public final void shutdown() {
-        shutdown(null);
-    }
     
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
-     */
-    public final void shutdown(Runnable onShutdown) {
-        if( startCounter.decrementAndGet()==0 ) {
-            final AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.size());
-            for (DispatcherThread d : new ArrayList<DispatcherThread>(dispatchers)) {
-                d.shutdown(shutdownCountDown, onShutdown);
+    protected void shutdown() {
+        Runnable countDown = new Runnable() {
+            AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.size());
+            public void run() {
+                if( shutdownCountDown.decrementAndGet()==0 ) {
+                    // Notify any registered shutdown watchers.
+                    AdvancedDispatcher.super.shutdown();
+                }
             }
-            loadBalancer.stop();
+        };
+
+        for (DispatcherThread d : new ArrayList<DispatcherThread>(dispatchers)) {
+            d.shutdown(countDown);
         }
+        loadBalancer.stop();
     }
 
     /**

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Sun Dec  6 16:28:35 2009
@@ -22,7 +22,6 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.util.Mapper;
@@ -30,8 +29,6 @@
 import org.apache.activemq.util.TimerHeap;
 import org.apache.activemq.util.list.LinkedNodeList;
 
-import static org.apache.activemq.dispatch.DispatcherFactory.*;
-
 public class DispatcherThread implements Runnable {
 
     static public final ThreadLocal<DispatcherThread> CURRENT = new ThreadLocal<DispatcherThread>();
@@ -124,27 +121,14 @@
             thread.start();
         }
     }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
-     */
-    public void shutdown() throws InterruptedException {
-        Thread joinThread = shutdown(new AtomicInteger(1), null);
-        if (joinThread != null) {
-            // thread.interrupt();
-            joinThread.join();
-        }
-    }
     
-    public Thread shutdown(final AtomicInteger shutdownCountDown, final Runnable onShutdown) {
+    public Thread shutdown(final Runnable onShutdown) {
         synchronized (this) {
             if (thread != null) {
                 dispatchInternal(new Runnable() {
                     public void run() {
                         running = false;
-                        if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+                        if( onShutdown!=null ) {
                             onShutdown.run();
                         }
                     }
@@ -153,7 +137,7 @@
                 thread = null;
                 return rc;
             } else {
-                if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+                if( onShutdown!=null) {
                     onShutdown.run();
                 }
             }
@@ -268,7 +252,7 @@
         foreignPermits.release();
     }
 
-    protected final void onForeignUpdate(DispatchContext context) {
+    protected void onForeignUpdate(DispatchContext context) {
         synchronized (foreignQueue) {
 
             ForeignEvent fe = context.updateEvent[foreignToggle];
@@ -281,7 +265,7 @@
         }
     }
 
-    protected final boolean removeDispatchContext(DispatchContext context) {
+    protected boolean removeDispatchContext(DispatchContext context) {
         synchronized (foreignQueue) {
 
             if (context.updateEvent[0].isLinked()) {
@@ -304,7 +288,7 @@
         return false;
     }
 
-    protected final boolean takeOwnership(DispatchContext context) {
+    protected boolean takeOwnership(DispatchContext context) {
         synchronized (this) {
             if (running) {
                 contexts.add(context);
@@ -316,13 +300,13 @@
     }
 
     //Special dispatch method that allow high priority dispatch:
-    private final void dispatchInternal(Runnable runnable, int priority) {
+    private void dispatchInternal(Runnable runnable, int priority) {
         DispatchContext context = new DispatchContext(this, runnable, false, name);
         context.priority = priority;
         context.requestDispatch();
     }
 
-    public final void dispatch(Runnable runnable, int priority) {
+    public void dispatch(Runnable runnable, int priority) {
         DispatchContext context = new DispatchContext(this, runnable, false, name);
         context.updatePriority(priority);
         context.requestDispatch();
@@ -343,11 +327,11 @@
         };
     }
 
-    public void execute(final Runnable runnable) {
+    public void execute(Runnable runnable) {
         dispatch(runnable, 0);
     }
     
-    public void execute(final Runnable runnable, int prio) {
+    public void execute(Runnable runnable, int prio) {
         dispatch(runnable, prio);
     }
 
@@ -368,7 +352,7 @@
         }
     }
     
-    final void add(ForeignEvent event) {
+    void add(ForeignEvent event) {
         synchronized (foreignQueue) {
             if (!event.isLinked()) {
                 foreignQueue[foreignToggle].addLast(event);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Sun Dec  6 16:28:35 2009
@@ -78,7 +78,7 @@
         throw new UnsupportedOperationException();
     }
 
-    public void setShutdownHandler(Runnable finalizer) {
+    public void addShutdownWatcher(Runnable finalizer) {
         throw new UnsupportedOperationException();
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java Sun Dec  6 16:28:35 2009
@@ -78,7 +78,7 @@
         throw new UnsupportedOperationException();
     }
 
-    public void setShutdownHandler(Runnable finalizer) {
+    public void addShutdownWatcher(Runnable finalizer) {
         throw new UnsupportedOperationException();
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Sun Dec  6 16:28:35 2009
@@ -85,7 +85,7 @@
         throw new UnsupportedOperationException();
     }
 
-    public void setShutdownHandler(Runnable finalizer) {
+    public void addShutdownWatcher(Runnable finalizer) {
         throw new UnsupportedOperationException();
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java Sun Dec  6 16:28:35 2009
@@ -38,7 +38,7 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class SimpleDispatcher extends BaseRetained implements Dispatcher {
+final public class SimpleDispatcher extends BaseRetained implements Dispatcher {
         
     public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
 
@@ -49,7 +49,6 @@
     
     final ConcurrentLinkedQueue<DispatcherThread> waitingDispatchers = new ConcurrentLinkedQueue<DispatcherThread>();
     final AtomicInteger waitingDispatcherCount = new AtomicInteger();
-    final AtomicInteger startCounter = new AtomicInteger();
     private final String label;
     TimerThread timerThread;
     
@@ -109,35 +108,32 @@
         }
     }
 
-    public void start() {
-        if( startCounter.getAndIncrement()==0 ) {
-            for (int i = 0; i < dispatchers.length; i++) {
-                dispatchers[i] = new DispatcherThread(this, i);
-                dispatchers[i].start();
-            }
-            timerThread = new TimerThread(this);
-            timerThread.start();
+    protected void startup() {
+        for (int i = 0; i < dispatchers.length; i++) {
+            dispatchers[i] = new DispatcherThread(this, i);
+            dispatchers[i].start();
         }
+        timerThread = new TimerThread(this);
+        timerThread.start();
     }
 
-    public void shutdown(final Runnable onShutdown) {
-        if( startCounter.decrementAndGet()==0 ) {
-            
-            final AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length+1);
-            Runnable wrapper = new Runnable() {
-                public void run() {
-                    if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
-                        onShutdown.run();
-                    }
-                    throw new DispatcherThread.Shutdown();
+    public void shutdown() {
+        
+        Runnable countDown = new Runnable() {
+            AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length+1);
+            public void run() {
+                if( shutdownCountDown.decrementAndGet()==0 ) {
+                    // Notify any registered shutdown watchers.
+                    SimpleDispatcher.super.shutdown();
                 }
-            };
-
-            timerThread.shutdown(wrapper);
-            for (int i = 0; i < dispatchers.length; i++) {
-                ThreadDispatchQueue queue = dispatchers[i].threadQueues[LOW.ordinal()];
-                queue.runnables.add(wrapper);
+                throw new DispatcherThread.Shutdown();
             }
+        };
+
+        timerThread.shutdown(countDown);
+        for (int i = 0; i < dispatchers.length; i++) {
+            ThreadDispatchQueue queue = dispatchers[i].threadQueues[LOW.ordinal()];
+            queue.runnables.add(countDown);
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java Sun Dec  6 16:28:35 2009
@@ -112,7 +112,7 @@
         throw new UnsupportedOperationException();
     }
 
-    public void setShutdownHandler(Runnable finalizer) {
+    public void addShutdownWatcher(Runnable finalizer) {
         throw new UnsupportedOperationException();
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java Sun Dec  6 16:28:35 2009
@@ -46,7 +46,7 @@
     public void testActorInvocation() throws Exception
     {
         Dispatcher advancedSystem = new AdvancedDispatcher(Runtime.getRuntime().availableProcessors(), 3);
-        advancedSystem.start();
+        advancedSystem.retain();
         
         DispatchQueue queue = advancedSystem.createSerialQueue("test");
         ActorTestObject testObject = Actor.create(new ActorTestObject(), queue);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Sun Dec  6 16:28:35 2009
@@ -34,22 +34,24 @@
 
     public static void main(String[] args) throws Exception {
         Dispatcher advancedSystem = new AdvancedDispatcher(Runtime.getRuntime().availableProcessors(), 3);
-        advancedSystem.start();
+        advancedSystem.retain();
         benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
         benchmark("advanced private serial queue", advancedSystem, advancedSystem.createSerialQueue("test"));
 
         RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
-        advancedSystem.shutdown(latch);
+        advancedSystem.addShutdownWatcher(latch);
+        advancedSystem.release();
         latch.await();
 
         Dispatcher simpleSystem = new SimpleDispatcher("test", Runtime.getRuntime().availableProcessors());
-        simpleSystem.start();
+        simpleSystem.retain();
         
         benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
         benchmark("simple private serial queue", simpleSystem, simpleSystem.createSerialQueue("test"));
 
         latch = new RunnableCountDownLatch(1);
-        simpleSystem.shutdown(latch);
+        advancedSystem.addShutdownWatcher(latch);
+        advancedSystem.release();
         latch.await();
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Sun Dec  6 16:28:35 2009
@@ -30,21 +30,21 @@
 public class DispatcherPoolTest {
     
     public static void main(String[] args) throws Exception {
-        AdvancedDispatcher pooledDispatcher = new AdvancedDispatcher(Runtime.getRuntime().availableProcessors(), 3);
-        pooledDispatcher.start();
+        AdvancedDispatcher dispatcher = new AdvancedDispatcher(Runtime.getRuntime().availableProcessors(), 3);
+        dispatcher.retain();
         
         // warm the JIT up..
-        benchmarkWork(pooledDispatcher, 100000);
+        benchmarkWork(dispatcher, 100000);
         
         int iterations = 1000*1000*20;
         long start = System.nanoTime();
-        benchmarkWork(pooledDispatcher, iterations);
+        benchmarkWork(dispatcher, iterations);
         long end = System.nanoTime();
         
         double durationMS = 1.0d*(end-start)/1000000d;
         double rate = 1000d * iterations / durationMS;
         
-        pooledDispatcher.shutdown();
+        dispatcher.release();
         System.out.println(format("duration: %,.3f ms, rate: %,.2f executions/sec", durationMS, rate));
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Sun Dec  6 16:28:35 2009
@@ -90,7 +90,7 @@
 
     protected void startServices() throws Exception {
         dispatcher = createDispatcher();
-        dispatcher.start();
+        dispatcher.retain();
         database = new BrokerDatabase(createStore());
         database.setDispatcher(dispatcher);
         if( TEST_MAX_STORE_LATENCY ) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Sun Dec  6 16:28:35 2009
@@ -88,7 +88,7 @@
         }
         
         stop();
-        dispatchQueue.setShutdownHandler(onShutdown);
+        dispatchQueue.addShutdownWatcher(onShutdown);
         dispatchQueue.release();
         dispatchQueue = null;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java Sun Dec  6 16:28:35 2009
@@ -210,7 +210,7 @@
         broker.setName("Broker");
         broker.createDispatcher();
         try {
-            broker.getDispatcher().start();
+            broker.getDispatcher().retain();
             broker.startServices();
         } catch (Exception e) {
             // TODO Auto-generated catch block

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java Sun Dec  6 16:28:35 2009
@@ -255,7 +255,7 @@
     private void createConnections(int destCount) throws Exception {
 
         dispatcher = createDispatcher("BrokerDispatcher");
-        dispatcher.start();
+        dispatcher.retain();
 
         if (multibroker) {
             sendBroker = createBroker("SendBroker", sendBrokerURI);
@@ -287,7 +287,7 @@
         Dispatcher clientDispatcher = null;
         if (SEPARATE_CLIENT_DISPATCHER) {
             clientDispatcher = createDispatcher("ClientDispatcher");
-            clientDispatcher.start();
+            clientDispatcher.retain();
         } else {
             clientDispatcher = dispatcher;
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java Sun Dec  6 16:28:35 2009
@@ -219,7 +219,7 @@
     }
 
     public void runTest() throws Exception {
-        getDispatcher().start();
+        getDispatcher().retain();
 
         // Start 'em up.
         startServices();
@@ -304,7 +304,7 @@
         System.out.println(IntrospectionSupport.toString(test));
         try
         {
-            test.getDispatcher().start();
+            test.getDispatcher().retain();
             test.createConnections();
             test.runTest();
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Sun Dec  6 16:28:35 2009
@@ -71,7 +71,7 @@
         	pipe.write(EOF_TOKEN);
             if (dispatchQueue != null) {
                 RunnableCountDownLatch done = new RunnableCountDownLatch(1);
-                dispatchQueue.setShutdownHandler(done);
+                dispatchQueue.addShutdownWatcher(done);
                 dispatchQueue.release();
                 done.await();
             } else {