You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/18 05:28:52 UTC

svn commit: r892128 [1/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-dispatcher/src/main/java/org/apache/activemq/dis...

Author: chirino
Date: Fri Dec 18 04:28:49 2009
New Revision: 892128

URL: http://svn.apache.org/viewvc?rev=892128&view=rev
Log:
Better lifecycle handling.


Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retainable.java
      - copied, changed from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Suspendable.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetainable.java
      - copied, changed from r891866, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DispatchObjectFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Fri Dec 18 04:28:49 2009
@@ -138,7 +138,7 @@
     		throw new IllegalStateException("Can only start a broker that is in the "+State.CONFIGURATION +" state.  Broker was "+state.get());
     	}
     	try {
-		    dispatcher.retain();
+		    dispatcher.resume();
 
 	    	synchronized(virtualHosts) {
 			    for (VirtualHost virtualHost : virtualHosts.values()) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Fri Dec 18 04:28:49 2009
@@ -97,7 +97,7 @@
     @Before
     public void setUp() throws Exception {
         dispatcher = createDispatcher();
-        dispatcher.retain();
+        dispatcher.resume();
         
         if (tcp) {
             sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Fri Dec 18 04:28:49 2009
@@ -64,7 +64,7 @@
     
     protected void startServices() throws Exception {
         dispatcher = createDispatcher();
-        dispatcher.retain();
+        dispatcher.resume();
         database = new BrokerDatabase(createStore());
         database.setDispatcher(dispatcher);
         database.start();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java Fri Dec 18 04:28:49 2009
@@ -20,14 +20,11 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public interface DispatchObject extends Retained {
+public interface DispatchObject extends Suspendable {
     
     public <Context> Context getContext();
     public <Context> void setContext(Context context);
 
-    public void suspend();
-    public void resume();
-
     public void setTargetQueue(DispatchQueue queue);
     public DispatchQueue getTargetQueue();
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Fri Dec 18 04:28:49 2009
@@ -30,7 +30,7 @@
 
     private static Dispatcher create() {
         Dispatcher rc = new DispatcherConfig().createDispatcher();
-        rc.retain();
+        rc.resume();
         return rc;
     }
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java Fri Dec 18 04:28:49 2009
@@ -23,7 +23,7 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public interface Dispatcher extends Retained {
+public interface Dispatcher extends Suspendable {
 
     public DispatchQueue getGlobalQueue();
     public DispatchQueue getGlobalQueue(DispatchPriority priority);

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retainable.java (from r891451, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retainable.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retainable.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java&r1=891451&r2=892128&rev=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retainable.java Fri Dec 18 04:28:49 2009
@@ -20,7 +20,7 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public interface Retained {
+public interface Retainable {
     
     public void retain();
     public void release();

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Suspendable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Suspendable.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Suspendable.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Suspendable.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,6 @@
+package org.apache.activemq.dispatch;
+
+public interface Suspendable extends Retainable {
+    public void suspend();
+    public void resume();
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Fri Dec 18 04:28:49 2009
@@ -25,40 +25,30 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-abstract public class AbstractDispatchObject extends BaseRetained implements DispatchObject {
+abstract public class AbstractDispatchObject extends BaseSuspendable implements DispatchObject {
 
     protected volatile Object context;
     protected volatile DispatchQueue targetQueue;
-    protected final AtomicInteger suspendCounter = new AtomicInteger();
 
     @SuppressWarnings("unchecked")
     public <Context> Context getContext() {
+       assertRetained();
         return (Context) context;
     }
     
     public <Context> void setContext(Context context) {
+       assertRetained();
         this.context = context;
     }
 
     public void setTargetQueue(DispatchQueue targetQueue) {
+       assertRetained();
         this.targetQueue = targetQueue;
     }
 
     public DispatchQueue getTargetQueue() {
+       assertRetained();
         return this.targetQueue;
     }
-    
-    public void resume() {
-        if( suspendCounter.decrementAndGet() == 0 ) {
-            onResume();
-        }
-    }
-
-    public void suspend() {
-        suspendCounter.incrementAndGet();
-    }
-    
-    protected void onResume() {
-    }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java Fri Dec 18 04:28:49 2009
@@ -39,7 +39,7 @@
     protected final AtomicInteger executeCounter = new AtomicInteger();
     
     protected final AtomicLong externalQueueSize = new AtomicLong();
-    protected final AtomicLong queueSize = new AtomicLong();
+    protected final AtomicLong size = new AtomicLong();
     protected final ConcurrentLinkedQueue<Runnable> externalQueue = new ConcurrentLinkedQueue<Runnable>();
 
     private final LinkedList<Runnable> localQueue = new LinkedList<Runnable>();
@@ -50,7 +50,6 @@
     public AbstractSerialDispatchQueue(String label, DispatchOption...options) {
         this.label = label;
         this.options = set(options);
-        retain();
     }
 
     static private Set<DispatchOption> set(DispatchOption[] options) {
@@ -64,28 +63,37 @@
     }
 
     @Override
+    protected void onStartup() {
+        dispatchSelfAsync();
+    }
+
+    @Override
     protected void onResume() {
         dispatchSelfAsync();
     }
 
     public void execute(Runnable command) {
+       assertRetained();
         dispatchAsync(command);
     }
 
     public void dispatchAsync(Runnable runnable) {
         assert runnable != null;
+       assertRetained();
 
-        if( queueSize.getAndIncrement()==0 ) {
-            retain();
-        }
+        long sizeWas = size.getAndIncrement();
 
         // We can take a shortcut...
         if( executing.get()!=null ) {
             localQueue.add(runnable);
         } else {
+            if( sizeWas==0 ) {
+                retain();
+            }
+
             long lastSize = externalQueueSize.getAndIncrement();
             externalQueue.add(runnable);
-            if( lastSize == 0 && suspendCounter.get()<=0 ) {
+            if( lastSize == 0 && suspended.get()<=0 ) {
                 dispatchSelfAsync();
             }
         }
@@ -122,7 +130,7 @@
         try {
             
             Runnable runnable;
-            while( suspendCounter.get() <= 0 ) {
+            while( suspended.get() <= 0 ) {
                 
                 if( (runnable = localQueue.poll())!=null ) {
                     counter++;
@@ -153,24 +161,30 @@
             }
             
         } finally {
-            long size = queueSize.addAndGet(-counter);
-            if( size==0 ) { 
-                release();
-            } else {
-                dispatchSelfAsync();
+            if( counter>0 ) {
+                long lsize = size.addAndGet(-counter);
+                assert lsize >= 0;
+                if( lsize==0 ) {
+                    release();
+                } else {
+                    dispatchSelfAsync();
+                }
             }
         }
     }
 
     public void dispatchSync(Runnable runnable) throws InterruptedException {
+       assertRetained();
         dispatchApply(1, runnable);
     }
     
     public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException {
+       assertRetained();
         QueueSupport.dispatchApply(this, iterations, runnable);
     }
 
     public Set<DispatchOption> getOptions() {
+       assertRetained();
         return options;
     }
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetainable.java (from r891866, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetainable.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetainable.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java&r1=891866&r2=892128&rev=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetainable.java Fri Dec 18 04:28:49 2009
@@ -19,51 +19,50 @@
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static java.lang.String.*;
+
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class BaseRetained {
-    
-    final protected AtomicInteger retainCounter = new AtomicInteger(0);
+public class BaseRetainable {
+
+    final protected AtomicInteger retained = new AtomicInteger(1);
     final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>(1);
 
     public void addShutdownWatcher(Runnable shutdownHandler) {
-        synchronized(shutdownHandlers) {
+        assertRetained();
+        synchronized (shutdownHandlers) {
             shutdownHandlers.add(shutdownHandler);
         }
     }
-    
+
     public void retain() {
-        if( retainCounter.getAndIncrement() == 0 ) {
-            startup();
-        }
+        assertRetained();
+        retained.getAndIncrement();
     }
 
     public void release() {
-        if( retainCounter.decrementAndGet()==0 ) {
-            shutdown();
+        assertRetained();
+        if (retained.decrementAndGet() == 0) {
+            onShutdown();
         }
     }
-    
-    protected boolean isShutdown()
-    {
-        return retainCounter.get() <= 0;
-    }
 
-    /**
-     * Subclasses should override if they want to do some startup processing. 
-     */
-    protected void startup() {
+    final protected void assertRetained() {
+        assert retained.get() > 0 : format("%s: Use of object not allowed after it has been released", this.toString());
     }
 
+    public boolean isShutdown() {
+        return retained.get() <= 0;
+    }
 
     /**
-     * Subclasses should override if they want to do clean up. 
+     * Subclasses should override if they want to do clean up.
      */
-    protected void shutdown() {
+    protected void onShutdown() {
         ArrayList<Runnable> copy;
-        synchronized(shutdownHandlers) {
+        synchronized (shutdownHandlers) {
             copy = new ArrayList<Runnable>(shutdownHandlers);
             shutdownHandlers.clear();
         }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseSuspendable.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.dispatch.Suspendable;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class BaseSuspendable extends BaseRetainable implements Suspendable {
+    
+    protected final AtomicBoolean startup = new AtomicBoolean(true);
+    protected final AtomicInteger suspended = new AtomicInteger();
+
+    public void resume() {
+       assertRetained();
+        if( suspended.decrementAndGet() == 0 ) {
+            if( startup.compareAndSet(true, false) ) {
+                onStartup();
+            } else {
+                onResume();
+            }
+        }
+    }
+
+    public void suspend() {
+       assertRetained();
+        if( suspended.getAndIncrement()==0 ) {
+            onSuspend();
+        }
+    }
+
+    protected void onStartup() {
+    }
+
+    protected void onSuspend() {
+    }
+
+    protected void onResume() {
+    }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java Fri Dec 18 04:28:49 2009
@@ -31,12 +31,12 @@
 import org.apache.activemq.dispatch.DispatchSource;
 import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.DispatcherConfig;
-import org.apache.activemq.dispatch.internal.BaseRetained;
+import org.apache.activemq.dispatch.internal.BaseSuspendable;
 import org.apache.activemq.dispatch.internal.nio.NIODispatchSource;
 
 import static org.apache.activemq.dispatch.DispatchPriority.*;
 
-final public class AdvancedDispatcher extends BaseRetained implements Dispatcher {
+final public class AdvancedDispatcher extends BaseSuspendable implements Dispatcher {
 
     public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
 
@@ -61,13 +61,11 @@
             globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]);
         }
         loadBalancer = new SimpleLoadBalancer();
+        super.suspend();
     }
-
-    /**
-     * @see org.apache.activemq.dispatch.internal.advanced.DispatcherThread#start()
-     */
-    protected void startup() {
-        loadBalancer.start();
+    
+    @Override
+    protected void onStartup() {
         for (int i = 0; i < size; i++) {
             DispatcherThread dispatacher;
             try {
@@ -78,16 +76,28 @@
                 e.printStackTrace();
             }
         }
+
+    }
+
+    @Override
+    public void retain() {
+        super.retain();
+    }
+    
+    @Override
+    public void suspend() {
+        throw new UnsupportedOperationException();
     }
 
-    protected void shutdown() {
+    @Override
+    protected void onShutdown() {
         Runnable countDown = new Runnable() {
             AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.size());
 
             public void run() {
                 if (shutdownCountDown.decrementAndGet() == 0) {
                     // Notify any registered shutdown watchers.
-                    AdvancedDispatcher.super.shutdown();
+                    AdvancedDispatcher.super.onShutdown();
                 }
             }
         };
@@ -97,6 +107,7 @@
         }
         loadBalancer.stop();
     }
+    
 
     /**
      * A Dispatcher must call this to indicate that is has started it's dispatch

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/nio/NIODispatchSource.java Fri Dec 18 04:28:49 2009
@@ -38,7 +38,6 @@
     private Runnable cancelHandler;
 
     public NIODispatchSource() {
-        super.retain();
     }
 
     /**
@@ -172,7 +171,7 @@
     }
 
     @Override
-    protected void shutdown() {
+    protected void onShutdown() {
         // actual close can only happen on the owning dispatch thread:
         if (key != null && key.isValid()) {
             // This will make sure that the key is removed

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Fri Dec 18 04:28:49 2009
@@ -26,6 +26,7 @@
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.internal.QueueSupport;
+import org.apache.activemq.util.IntrospectionSupport;
 
 /**
  * 
@@ -143,4 +144,9 @@
         return null;
     }
 
+    @Override
+    public String toString() {
+        return IntrospectionSupport.toString(this);
+    }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java Fri Dec 18 04:28:49 2009
@@ -23,6 +23,7 @@
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
+import org.apache.activemq.util.IntrospectionSupport;
 
 /**
  * 
@@ -31,60 +32,61 @@
 public final class SerialDispatchQueue extends AbstractSerialDispatchQueue implements SimpleQueue {
 
     private final SimpleDispatcher dispatcher;
-    private volatile boolean stickToThreadOnNextDispatch; 
-    private volatile boolean stickToThreadOnNextDispatchRequest; 
-    
-    SerialDispatchQueue(SimpleDispatcher dispatcher, String label, DispatchOption...options) {
+
+    private volatile boolean stickToThreadOnNextDispatch;
+
+    private volatile boolean stickToThreadOnNextDispatchRequest;
+
+    SerialDispatchQueue(SimpleDispatcher dispatcher, String label, DispatchOption... options) {
         super(label, options);
         this.dispatcher = dispatcher;
-        if( getOptions().contains(DispatchOption.STICK_TO_DISPATCH_THREAD) ) {
-            stickToThreadOnNextDispatch=true;
+        if (getOptions().contains(DispatchOption.STICK_TO_DISPATCH_THREAD)) {
+            stickToThreadOnNextDispatch = true;
         }
     }
 
     @Override
     public void setTargetQueue(DispatchQueue targetQueue) {
-        GlobalDispatchQueue global = ((SimpleQueue)targetQueue).isGlobalDispatchQueue(); 
-        if( getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD) && global!=null ) {
-            stickToThreadOnNextDispatchRequest=true;
+        assertRetained();
+        GlobalDispatchQueue global = ((SimpleQueue) targetQueue).isGlobalDispatchQueue();
+        if (getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD) && global != null) {
+            stickToThreadOnNextDispatchRequest = true;
         }
         super.setTargetQueue(targetQueue);
     }
-    
+
     @Override
     public void dispatchAsync(Runnable runnable) {
-        
-        if( stickToThreadOnNextDispatchRequest ) {
+        assertRetained();
+
+        if (stickToThreadOnNextDispatchRequest) {
             SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
-            if( current!=null ) {
+            if (current != null) {
                 SimpleQueue parent;
-                while( (parent = current.getTargetQueue()) !=null ) {
+                while ((parent = current.getTargetQueue()) != null) {
                     current = parent;
                 }
-                if( current.isThreadDispatchQueue()==null ) {
-                    System.out.println("crap");
-                }
                 super.setTargetQueue(current);
-                stickToThreadOnNextDispatchRequest=false;
+                stickToThreadOnNextDispatchRequest = false;
             }
         }
 
         super.dispatchAsync(runnable);
     }
-    
+
     public void run() {
         SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
         SimpleDispatcher.CURRENT_QUEUE.set(this);
-        
+
         try {
-            if( stickToThreadOnNextDispatch ) {
-                stickToThreadOnNextDispatch=false;
+            if (stickToThreadOnNextDispatch) {
+                stickToThreadOnNextDispatch = false;
                 GlobalDispatchQueue global = current.isGlobalDispatchQueue();
-                if( global!=null ) {
+                if (global != null) {
                     setTargetQueue(global.getTargetQueue());
                 }
             }
-            
+
             DispatcherThread thread = DispatcherThread.currentDispatcherThread();
             dispatch(thread.executionCounter);
         } finally {
@@ -92,8 +94,9 @@
         }
 
     }
-    
+
     public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+        assertRetained();
         dispatcher.timerThread.addRelative(runnable, this, delay, unit);
     }
 
@@ -116,9 +119,14 @@
     public ThreadDispatchQueue isThreadDispatchQueue() {
         return null;
     }
-    
+
     public SimpleQueue getTargetQueue() {
+        assertRetained();
         return (SimpleQueue) targetQueue;
     }
 
+    @Override
+    public String toString() {
+        return IntrospectionSupport.toString(this, "label", "size", "suspended", "retained");
+    }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java Fri Dec 18 04:28:49 2009
@@ -21,14 +21,14 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchOption;
 import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.DispatcherConfig;
-import org.apache.activemq.dispatch.DispatchOption;
-import org.apache.activemq.dispatch.internal.BaseRetained;
 import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
+import org.apache.activemq.dispatch.internal.BaseSuspendable;
 
 import static org.apache.activemq.dispatch.DispatchPriority.*;
 
@@ -37,7 +37,7 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-final public class SimpleDispatcher extends BaseRetained implements Dispatcher {
+final public class SimpleDispatcher extends BaseSuspendable implements Dispatcher {
 
     public final static ThreadLocal<SimpleQueue> CURRENT_QUEUE = new ThreadLocal<SimpleQueue>();
 
@@ -58,6 +58,7 @@
             globalQueues[i] = new GlobalDispatchQueue(this, DispatchPriority.values()[i]);
         }
         dispatchers = new DispatcherThread[config.getThreads()];
+        super.suspend();
     }
 
     public DispatchQueue getMainQueue() {
@@ -107,8 +108,14 @@
             }
         }
     }
+    
+    @Override
+    public void suspend() {
+        throw new UnsupportedOperationException();
+    }
 
-    protected void startup() {
+    @Override
+    protected void onStartup() {
         for (int i = 0; i < dispatchers.length; i++) {
             dispatchers[i] = new DispatcherThread(this, i);
             dispatchers[i].start();
@@ -117,7 +124,8 @@
         timerThread.start();
     }
 
-    public void shutdown() {
+    @Override
+    public void onShutdown() {
 
         Runnable countDown = new Runnable() {
             AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length);
@@ -125,7 +133,7 @@
             public void run() {
                 if (shutdownCountDown.decrementAndGet() == 0) {
                     // Notify any registered shutdown watchers.
-                    SimpleDispatcher.super.shutdown();
+                    SimpleDispatcher.super.onShutdown();
                 }
                 throw new DispatcherThread.Shutdown();
             }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java Fri Dec 18 04:28:49 2009
@@ -41,7 +41,7 @@
 
     public void testActorInvocation() throws Exception {
         Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
-        advancedSystem.retain();
+        advancedSystem.resume();
 
         DispatchQueue queue = advancedSystem.createSerialQueue("test");
         TestObjectActor actor = ActorProxy.create(TestObjectActor.class, new TestObject(), queue);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Fri Dec 18 04:28:49 2009
@@ -32,7 +32,7 @@
 
     public static void main(String[] args) throws Exception {
         Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
-        advancedSystem.retain();
+        advancedSystem.resume();
         benchmarkGlobal("advanced global queue", advancedSystem);
         benchmarkSerial("advanced private serial queue", advancedSystem);
 
@@ -44,7 +44,7 @@
         DispatcherConfig config = new DispatcherConfig();
         config.setThreads(6);
         Dispatcher simpleSystem = new SimpleDispatcher(config);
-        simpleSystem.retain();
+        simpleSystem.resume();
         
         benchmarkGlobal("simple global queue", simpleSystem);
         benchmarkSerial("simple private serial queue", simpleSystem);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Fri Dec 18 04:28:49 2009
@@ -32,7 +32,7 @@
     
     public static void main(String[] args) throws Exception {
         AdvancedDispatcher dispatcher = new AdvancedDispatcher(new DispatcherConfig());
-        dispatcher.retain();
+        dispatcher.resume();
         
         // warm the JIT up..
         benchmarkWork(dispatcher, 100000);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Fri Dec 18 04:28:49 2009
@@ -92,7 +92,7 @@
 
     protected void startServices() throws Exception {
         dispatcher = createDispatcher();
-        dispatcher.retain();
+        dispatcher.resume();
         database = new BrokerDatabase(createStore());
         database.setDispatcher(dispatcher);
         if( TEST_MAX_STORE_LATENCY ) {

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BaseConnection.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.queue.actor.perf;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
+import org.apache.activemq.queue.actor.transport.Transport;
+import org.apache.activemq.queue.actor.transport.TransportHandler;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class BaseConnection {
+
+    protected interface ConnectionStateActor extends TransportHandler {
+        void onStart();
+        void onStop();
+    }
+    
+    protected String name;
+    protected Dispatcher dispatcher;
+
+    protected DispatchQueue dispatchQueue;
+    protected ConnectionStateActor actor;
+
+    public void start() {
+        dispatchQueue = dispatcher.createSerialQueue(name);
+        createActor();
+        actor.onStart();
+    }
+
+    public void stop() throws InterruptedException {
+        actor.onStop();
+        RunnableCountDownLatch done = new RunnableCountDownLatch(1);
+        dispatchQueue.addShutdownWatcher(done);
+        dispatchQueue.release();
+        done.await();
+    }
+    
+    abstract protected void createActor();
+    
+    // The actor pattern ensures that this object is only accessed in
+    // serial execution context.  So synchronization is required.
+    // It also places a restriction that all operations should 
+    // avoid mutex contention and avoid blocking IO calls.
+    protected class ConnectionState implements ConnectionStateActor {
+        
+        final protected WindowController inboundSessionWindow = new WindowController();
+        final protected WindowLimiter outboundSessionWindow = new WindowLimiter();
+        final protected WindowLimiter outboundTransportWindow = new WindowLimiter();
+
+        protected Transport transport;
+        protected boolean disconnected;
+        protected Exception failure;
+
+        ConnectionState() {
+            outboundTransportWindow.size(100);
+        }
+        
+        public void onStart() {
+            dispatchQueue.addShutdownWatcher(new Runnable() {
+                public void run() {
+                    transport.setTargetQueue(dispatcher.getGlobalQueue());
+                    transport.release();
+                }
+            });
+            transport.setTargetQueue(dispatchQueue);
+            transport.setHandler(this);
+            transport.resume();
+        }
+        
+        public void onStop() {
+        }
+
+        public void onConnect() {
+            sendFlowControl(inboundSessionWindow.maxSize(1000));
+        }
+        
+        public void onDisconnect() {
+            disconnected = true;
+        }
+
+        public void onFailure(Exception failure) {
+            failure.printStackTrace();
+            this.failure = failure;
+        }
+
+        public void onRecevie(Object command) {
+            if (command.getClass() == Message.class) {
+                // We should not be getting messages
+                // when the window is closed..
+                if( inboundSessionWindow.isClosed() ) {
+                    onFailure(new Exception("Session overrun: " + command));
+                }
+                outboundSessionWindow.change(-1);
+                onReceiveMessage((Message) command);
+            } else if (command.getClass() == FlowControlBean.class || command.getClass() == FlowControlBuffer.class) {
+                onReceiveFlowControl((FlowControl) command);
+            } else if (command.getClass() == String.class) {
+                onReceiveString((String)command);
+            } else if (command.getClass() == DestinationBuffer.class || command.getClass() == DestinationBean.class) {
+                onReceiveDestination((Destination)command);
+            } else {
+                onFailure(new Exception("Unrecognized command: " + command));
+            }
+        }
+
+        public void sessionSend(Message message) {
+            transportSend(message);
+        }
+        
+        protected void onReceiveDestination(Destination command) {
+        }
+
+        protected void onReceiveString(String command) {
+        }
+
+        protected void onReceiveMessage(Message msg) {
+            sendFlowControl(inboundSessionWindow.processed(1));
+        }
+
+        private void sendFlowControl(int credits) {
+            if( credits!=0 ) {
+                FlowControlBean fc = new FlowControlBean();
+                fc.setCredit(credits);
+                transportSend(fc);
+            }
+        }
+        
+        public void transportSend(Object message) {
+            outboundTransportWindow.change(-1);
+            transport.send(message, onSendCompleted, dispatchQueue);
+        }
+        
+        private final Runnable onSendCompleted = new Runnable() {
+            public void run() {
+                boolean wasClosed = outboundTransportWindow.isClosed();
+                outboundTransportWindow.change(1);
+                if( !wasClosed && !isSessionSendBlocked() ) {
+                    onSessionResume();
+                }
+            }
+        };
+        
+        protected void onReceiveFlowControl(FlowControl command) {
+            boolean wasClosed = outboundSessionWindow.isClosed();
+            outboundSessionWindow.change(command.getCredit());
+            if( wasClosed && !isSessionSendBlocked() ) {
+                onSessionResume();
+            }
+
+        }
+
+        protected boolean isSessionSendBlocked() {
+            return outboundTransportWindow.isClosed() || outboundSessionWindow.isClosed(); 
+        }
+
+        protected void onSessionResume() {
+        }
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Dispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(Dispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+    
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/BrokerConnection.java Fri Dec 18 04:28:49 2009
@@ -28,7 +28,7 @@
  */
 public class BrokerConnection extends BaseConnection implements DeliveryTarget {
     
-    interface BrokerProtocol extends Protocol {
+    interface BrokerConnectionStateActor extends ConnectionStateActor {
         public void onBrokerDispatch(Message msg, Runnable r);
     }
     
@@ -45,22 +45,22 @@
     }
     
     private MockBroker broker;
-    private BrokerProtocol brokerActor;
+    private BrokerConnectionStateActor brokerActor;
     private Transport transport;
     private int priorityLevels;
 
     protected void createActor() {
-        actor = brokerActor = ActorProxy.create(BrokerProtocol.class, new BrokerProtocolImpl(), dispatchQueue);
+        actor = brokerActor = ActorProxy.create(BrokerConnectionStateActor.class, new BrokerConnectionState(), dispatchQueue);
     }
 
-    protected class BrokerProtocolImpl extends ProtocolImpl implements BrokerProtocol {
+    protected class BrokerConnectionState extends ConnectionState implements BrokerConnectionStateActor {
 
         String name;
         
         @Override
-        public void start() {
+        public void onStart() {
             this.transport = BrokerConnection.this.transport;
-            super.start();
+            super.onStart();
         }
         
         // TODO: to increase fairness: we might want to have a pendingQueue per sender
@@ -81,7 +81,7 @@
             // is configured with.
             broker.router.route(msg, dispatchQueue, new Runnable() {
                 public void run() {
-                    BrokerProtocolImpl.super.onReceiveMessage(msg);
+                    BrokerConnectionState.super.onReceiveMessage(msg);
                 }
             });
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ClientConnection.java Fri Dec 18 04:28:49 2009
@@ -32,15 +32,15 @@
     }
 
     protected void createActor() {
-        actor = ActorProxy.create(Protocol.class, new ClientProtocolImpl(), dispatchQueue);
+        actor = ActorProxy.create(ConnectionStateActor.class, new ClientConnectionState(), dispatchQueue);
     }
 
-    protected class ClientProtocolImpl extends ProtocolImpl  {
+    protected class ClientConnectionState extends ConnectionState  {
         
         @Override
-        public void start()  {
+        public void onStart()  {
             transport = TransportFactorySystem.connect(dispatcher, connectUri);
-            super.start();
+            super.onStart();
         }
 
         public void onConnect() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ConsumerConnection.java Fri Dec 18 04:28:49 2009
@@ -37,16 +37,16 @@
     private final MetricCounter rate = new MetricCounter();
 
     protected void createActor() {
-        actor = ActorProxy.create(Protocol.class, new ProducerProtocolImpl(), dispatchQueue);
+        actor = ActorProxy.create(ConnectionStateActor.class, new ConsumerConnectionState(), dispatchQueue);
     }
 
-    class ProducerProtocolImpl extends ClientProtocolImpl {
+    class ConsumerConnectionState extends ClientConnectionState {
 
         @Override
-        public void start() {
+        public void onStart() {
             rate.name("Consumer " + name + " Rate");
             totalConsumerRate.add(rate);
-            super.start();
+            super.onStart();
         }
         
         @Override
@@ -61,7 +61,7 @@
                 dispatchQueue.dispatchAfter(new Runnable() {
                     public void run() {
                         rate.increment();
-                        ProducerProtocolImpl.super.onReceiveMessage(msg);
+                        ConsumerConnectionState.super.onReceiveMessage(msg);
                     }
                 }, thinkTime, TimeUnit.MILLISECONDS);
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DispatchObjectFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DispatchObjectFilter.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DispatchObjectFilter.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/DispatchObjectFilter.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.queue.actor.perf;
+
+import org.apache.activemq.dispatch.DispatchObject;
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatchObjectFilter implements DispatchObject {
+    protected DispatchObject next;
+    
+    public DispatchObjectFilter() {
+    }
+
+    public DispatchObjectFilter(DispatchObject next) {
+        this.next = next;
+    }
+    
+    public void addShutdownWatcher(Runnable shutdownWatcher) {
+        next.addShutdownWatcher(shutdownWatcher);
+    }
+    public <Context> Context getContext() {
+        return next.getContext();
+    }
+    public DispatchQueue getTargetQueue() {
+        return next.getTargetQueue();
+    }
+    public void release() {
+        next.release();
+    }
+    public void resume() {
+        next.resume();
+    }
+    public void retain() {
+        next.retain();
+    }
+    public <Context> void setContext(Context context) {
+        next.setContext(context);
+    }
+    public void setTargetQueue(DispatchQueue queue) {
+        next.setTargetQueue(queue);
+    }
+    public void suspend() {
+        next.suspend();
+    }
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java Fri Dec 18 04:28:49 2009
@@ -18,6 +18,8 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.dispatch.DispatchQueue;
@@ -29,6 +31,8 @@
 import org.apache.activemq.queue.actor.transport.TransportServer;
 import org.apache.activemq.queue.actor.transport.TransportServerHandler;
 
+import static java.util.concurrent.TimeUnit.*;
+
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -80,16 +84,18 @@
         transportServer.release();
 
         for (BrokerConnection connection : connections) {
-            connection.release();
+            connection.stop();
         }
         for (BrokerConnection connection : brokerConnections) {
-            connection.release();
+            connection.stop();
         }
         for (MockQueue queue : queues.values()) {
             queue.stop();
         }
     }
 
+    CountDownLatch bindLatch = new CountDownLatch(1);
+    
     public final void startServices() throws Exception {
         brokerDispatchQueue = dispatcher.createSerialQueue("broker");
         transportServer = TransportFactorySystem.bind(dispatcher, uri);
@@ -100,12 +106,24 @@
         for (MockQueue queue : queues.values()) {
             queue.start();
         }
+        
+        for (BrokerConnection connection : brokerConnections) {
+            connection.start();
+        }
+        
+        if( !bindLatch.await(5, SECONDS) ) {
+            throw new Exception("bind timeout");
+        }
+    }
+
+    public void onFailure(Exception error) {
+        bindLatch.countDown();
+        System.out.println("Accept error: " + error);
+        error.printStackTrace();
     }
 
     public void onBind() {
-        for (BrokerConnection connection : brokerConnections) {
-            connection.retain();
-        }
+        bindLatch.countDown();
     }
     
     public void onUnbind() {
@@ -117,18 +135,15 @@
         connection.setTransport(transport);
         connection.setPriorityLevels(MockBrokerTest.PRIORITY_LEVELS);
         connection.setDispatcher(dispatcher);
+        connection.setName("handler: "+transport.getRemoteAddress());
         connections.add(connection);
         try {
-            connection.retain();
+            connection.start();
         } catch (Exception e1) {
             onFailure(e1);
         }
     }
 
-    public void onFailure(Exception error) {
-        System.out.println("Accept error: " + error);
-        error.printStackTrace();
-    }
 
     public Dispatcher getDispatcher() {
         return dispatcher;
@@ -181,7 +196,7 @@
         broker.setName("Broker");
         broker.createDispatcher();
         try {
-            broker.getDispatcher().retain();
+            broker.getDispatcher().resume();
             broker.startServices();
         } catch (Exception e) {
             // TODO Auto-generated catch block

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBrokerTest.java Fri Dec 18 04:28:49 2009
@@ -18,8 +18,6 @@
 
 import java.util.ArrayList;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.DispatcherConfig;
 import org.apache.activemq.flow.Commands.Destination;
@@ -27,12 +25,16 @@
 import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class MockBrokerTest extends TestCase {
+public class MockBrokerTest {
 
     protected static final int PERFORMANCE_SAMPLES = 3;
     protected static final int SAMPLING_FREQUENCY = 5;
@@ -58,8 +60,7 @@
     protected String receiveBrokerURI;
 
     // Set's the number of threads to use:
-    protected static final boolean SEPARATE_CLIENT_DISPATCHER = false;
-    protected final int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
+    static protected final int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
     protected boolean usePartitionedQueue = false;
 
     protected ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
@@ -67,7 +68,7 @@
     protected MockBroker rcvBroker;
     protected MockClient client;
 
-    protected Dispatcher dispatcher;
+    protected static Dispatcher dispatcher;
 
     static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
         public Long map(Message element) {
@@ -82,8 +83,22 @@
         }
     };
 
-    @Override
-    protected void setUp() throws Exception {
+    @BeforeClass
+    static public void setUpSuite() throws Exception {
+        dispatcher = DispatcherConfig.create("test", threadsPerDispatcher);
+        dispatcher.resume();
+    }
+    
+    @AfterClass
+    static public void tearDownSuite() throws Exception {
+        if( dispatcher!=null ) {
+            dispatcher.release();
+        }
+    }
+    
+    
+    @Before
+    public void setUp() throws Exception {
         if (tcp) {
             sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
             receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
@@ -98,10 +113,7 @@
         }
     }
 
-    protected Dispatcher createDispatcher(String name) {
-        return DispatcherConfig.create("test", threadsPerDispatcher);
-    }
-
+    @Test
     public void test_1_1_0() throws Exception {
 
         client = new MockClient();
@@ -109,67 +121,73 @@
         client.setDestCount(1);
         client.setNumConsumers(0);
 
-        createConnections(1);
+        createConnections("test_1_1_0", 1);
         runTestCase();
     }
 
+    @Test
     public void test_1_1_1() throws Exception {
         client = new MockClient();
         client.setNumProducers(1);
         client.setDestCount(1);
         client.setNumConsumers(1);
 
-        createConnections(1);
+        createConnections("test_1_1_1", 1);
         runTestCase();
     }
 
+    @Test
     public void test_10_10_10() throws Exception {
         client = new MockClient();
         client.setNumProducers(FANIN_COUNT);
         client.setDestCount(FANIN_COUNT);
         client.setNumConsumers(FANOUT_COUNT);
 
-        createConnections(FANIN_COUNT);
+        createConnections("test_10_10_10", FANIN_COUNT);
         runTestCase();
     }
 
+    @Test
     public void test_10_1_10() throws Exception {
         client = new MockClient();
         client.setNumProducers(FANIN_COUNT);
         client.setDestCount(1);
         client.setNumConsumers(FANOUT_COUNT);
 
-        createConnections(1);
+        createConnections("test_10_1_10", 1);
         runTestCase();
     }
 
+    @Test
     public void test_10_1_1() throws Exception {
         client = new MockClient();
         client.setNumProducers(FANIN_COUNT);
         client.setDestCount(1);
         client.setNumConsumers(1);
 
-        createConnections(1);
+        createConnections("test_1_1_1", 1);
         runTestCase();
     }
 
+    @Test
     public void test_1_1_10() throws Exception {
         client = new MockClient();
         client.setNumProducers(1);
         client.setDestCount(1);
         client.setNumConsumers(FANOUT_COUNT);
 
-        createConnections(1);
+        createConnections("test_1_1_10", 1);
         runTestCase();
     }
 
+    @Test
     public void test_2_2_2() throws Exception {
         client = new MockClient();
         client.setNumProducers(2);
         client.setDestCount(2);
         client.setNumConsumers(2);
 
-        createConnections(2);
+        createConnections("test_2_2_2", 2);
         runTestCase();
     }
 
@@ -180,25 +198,27 @@
      * 
      * @throws Exception
      */
+    @Test
     public void test_2_2_2_SlowConsumer() throws Exception {
         client = new MockClient();
         client.setNumProducers(2);
         client.setDestCount(2);
         client.setNumConsumers(2);
 
-        createConnections(2);
+        createConnections("test_2_2_2_SlowConsumer", 2);
         client.consumer(0).setThinkTime(50);
         runTestCase();
 
     }
 
+    @Test
     public void test_2_2_2_Selector() throws Exception {
         client = new MockClient();
         client.setNumProducers(2);
         client.setDestCount(2);
         client.setNumConsumers(2);
 
-        createConnections(2);
+        createConnections("test_2_2_2_Selector", 2);
 
         // Add properties to match producers to their consumers
         for (int i = 0; i < 2; i++) {
@@ -216,6 +236,7 @@
      * 
      * @throws Exception
      */
+    @Test
     public void test_2_1_1_HighPriorityProducer() throws Exception {
 
         client = new MockClient();
@@ -223,7 +244,7 @@
         client.setNumConsumers(1);
         client.setDestCount(1);
 
-        createConnections(1);
+        createConnections("test_2_1_1_HighPriorityProducer", 1);
         ProducerConnection producer = client.producer(0);
         client.includeInRateReport(producer);
         producer.setPriority(1);
@@ -240,13 +261,14 @@
      * 
      * @throws Exception
      */
+    @Test
     public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
         client = new MockClient();
         client.setNumProducers(2);
         client.setNumConsumers(1);
         client.setDestCount(1);
 
-        createConnections(1);
+        createConnections("test_2_1_1_MixedHighPriorityProducer", 1);
         ProducerConnection producer = client.producer(0);
         producer.setPriority(1);
         producer.setPriorityMod(3);
@@ -256,10 +278,7 @@
         runTestCase();
     }
 
-    private void createConnections(int destCount) throws Exception {
-
-        dispatcher = createDispatcher("BrokerDispatcher");
-        dispatcher.retain();
+    private void createConnections(String testName, int destCount) throws Exception {
 
         if (multibroker) {
             sendBroker = createBroker("SendBroker", sendBrokerURI);
@@ -288,16 +307,8 @@
             }
         }
 
-        Dispatcher clientDispatcher = null;
-        if (SEPARATE_CLIENT_DISPATCHER) {
-            clientDispatcher = createDispatcher("ClientDispatcher");
-            clientDispatcher.retain();
-        } else {
-            clientDispatcher = dispatcher;
-        }
-
         // Configure Client:
-        client.setDispatcher(clientDispatcher);
+        client.setDispatcher(dispatcher);
         client.setNumPriorities(PRIORITY_LEVELS);
         client.setSendBrokerURI(sendBroker.getUri());
         client.setReceiveBrokerURI(rcvBroker.getUri());
@@ -305,8 +316,7 @@
         client.setSamplingFrequency(1000 * SAMPLING_FREQUENCY);
         client.setThreadsPerDispatcher(threadsPerDispatcher);
         client.setPtp(ptp);
-        client.setTestName(getName());
-
+        client.setTestName(testName);
         client.createConnections();
     }
 
@@ -336,19 +346,15 @@
         try {
             client.runTest();
         } finally {
+            System.out.println("Shutting down..");
             stopServices();
         }
     }
 
     private void stopServices() throws Exception {
-
         for (MockBroker broker : brokers) {
             broker.stopServices();
         }
-
-        if (dispatcher != null) {
-            dispatcher.release();
-        }
     }
 
     private void startServices() throws Exception {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockClient.java Fri Dec 18 04:28:49 2009
@@ -227,8 +227,6 @@
     }
 
     public void runTest() throws Exception {
-        getDispatcher().retain();
-
         // Start 'em up.
         startServices();
         try {
@@ -241,21 +239,21 @@
     private void startServices() throws Exception {
 //        BaseTestConnection.setInShutdown(false, dispatcher);
         for (ConsumerConnection connection : consumers) {
-            connection.retain();
+            connection.start();
         }
 
         for (ProducerConnection connection : producers) {
-            connection.retain();
+            connection.start();
         }
     }
 
     private void stopServices() throws Exception {
 //        BaseTestConnection.setInShutdown(true, dispatcher);
         for (ProducerConnection connection : producers) {
-            connection.release();
+            connection.stop();
         }
         for (ConsumerConnection connection : consumers) {
-            connection.release();
+            connection.stop();
         }
     }
 
@@ -311,7 +309,7 @@
         System.out.println(IntrospectionSupport.toString(test));
         try
         {
-            test.getDispatcher().retain();
+            test.getDispatcher().resume();
             test.createConnections();
             test.runTest();
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/ProducerConnection.java Fri Dec 18 04:28:49 2009
@@ -40,16 +40,17 @@
     private AtomicLong messageIdGenerator;
 
     protected void createActor() {
-        actor = ActorProxy.create(Protocol.class, new ProducerProtocolImpl(), dispatchQueue);
+        actor = ActorProxy.create(ConnectionStateActor.class, new ProducerConnectionState(), dispatchQueue);
     }
 
-    class ProducerProtocolImpl extends ClientProtocolImpl {
+    class ProducerConnectionState extends ClientConnectionState {
         
         private String filler;
         private int payloadCounter;
+        private boolean stopped;
 
         @Override
-        public void start() {
+        public void onStart() {
             rate.name("Producer " + name + " Rate");
             totalProducerRate.add(rate);
 
@@ -60,7 +61,7 @@
                 }
                 filler = sb.toString();
             }
-            super.start();
+            super.onStart();
         }
         
         @Override
@@ -73,8 +74,14 @@
             produceMessages();
         }
         
+        @Override
+        public void onStop() {
+            stopped = true;
+            super.onStop();
+        }
+        
         private void produceMessages() {
-            while( !isSessionSendBlocked() ) {
+            while( !isSessionSendBlocked() && !stopped ) {
                 int p = priority;
                 if (priorityMod > 0) {
                     p = payloadCounter % priorityMod == 0 ? 0 : p;

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowController.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.queue.actor.perf;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class WindowController extends WindowLimiter {
+
+    private int maxSize;
+    private int processed;
+    private int creditsAt;
+    
+    public int processed(int count) {
+        int rc = 0;
+        processed += count;
+        if( processed >= creditsAt ) {
+            change(processed);
+            rc = processed;
+            processed = 0;
+        }
+        return rc;
+    }
+    
+    int maxSize(int newMaxSize) {
+        int change = newMaxSize-maxSize;
+        this.maxSize=newMaxSize;
+        this.creditsAt = maxSize/2;
+        change(change);
+        return change;
+    }
+    
+    int maxSize() {
+        return maxSize;
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java?rev=892128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/WindowLimiter.java Fri Dec 18 04:28:49 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.queue.actor.perf;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class WindowLimiter {
+
+    private int opensAt = 1;
+    private int size;
+    private boolean closed;
+    
+    public WindowLimiter() {
+        this.closed = true;
+    }
+
+    int size() {
+        return size;
+    }
+    
+    WindowLimiter size(int size) {
+        this.size = size;
+        return this;
+    }
+    
+    public boolean isOpen() {
+        return !closed;
+    }
+    
+    public boolean isClosed() {
+        return closed;
+    }
+    
+    public void change(int change) {
+        size += change;
+        if( change > 0 && closed && size >= opensAt) {
+            closed = false;
+        } else if( change < 0 && !closed && size <= 0) {
+            closed = true;
+        }
+    }
+
+}
\ No newline at end of file