You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:46:58 UTC

svn commit: r961083 - in /activemq/sandbox/activemq-apollo-actor: ./ activemq-broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ activemq-hawtdb/src/main/java...

Author: chirino
Date: Wed Jul  7 03:46:57 2010
New Revision: 961083

URL: http://svn.apache.org/viewvc?rev=961083&view=rev
Log:
- added async aware version of start/stop to the service interface
- cleaned up startup/shutdown logic in the broker and test case.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
      - copied, changed from r961082, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/Service.java
    activemq/sandbox/activemq-apollo-actor/pom.xml

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml Wed Jul  7 03:46:57 2010
@@ -91,6 +91,13 @@
       <version>${junit-version}</version>
     </dependency>
     <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest</artifactId>
+      <version>${scalatest-version}</version>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <scope>test</scope>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:46:57 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.apollo.broker
 
 import _root_.java.io.{File}
-import _root_.java.util.{LinkedList, LinkedHashMap, ArrayList}
 import _root_.org.apache.activemq.transport._
 import _root_.org.apache.activemq.Service
 import _root_.java.lang.{String}
@@ -27,6 +26,8 @@ import _root_.org.fusesource.hawtdispatc
 import _root_.scala.collection.JavaConversions._
 import _root_.scala.reflect.BeanProperty
 import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
+import java.util.{HashSet, LinkedList, LinkedHashMap, ArrayList}
+import java.util.concurrent.{TimeUnit, CountDownLatch}
 
 object BrokerFactory {
 
@@ -65,6 +66,63 @@ object BrokerFactory {
 
 }
 
+class CompletionTracker(val queue:DispatchQueue=getGlobalQueue) {
+  private[this] val tasks = new HashSet[Runnable]()
+  private[this] var _callback:Runnable = null
+  queue.retain
+
+  def task(name:String="unknown"):Runnable = {
+    val rc = new Runnable() {
+      def run():Unit = {
+        tasks.synchronized {
+          if( tasks.remove(this) ) {
+            if( tasks.isEmpty && _callback!=null ) {
+              _callback ->: queue
+              queue.release
+            }
+          }
+        }
+      }
+      override def toString = name
+    }
+    tasks.synchronized {
+      if( _callback!=null ) {
+        throw new IllegalStateException("all tasks should be created before setting the callback");
+      }
+      tasks.add(rc)
+    }
+    return rc
+  }
+
+  def callback(handler: =>Unit ) {
+    tasks.synchronized {
+      _callback = handler _
+      if(  tasks.isEmpty ) {
+        _callback ->: queue
+        queue.release
+      }
+    }
+  }
+
+  def await() = {
+    val latch =new CountDownLatch(1)
+    callback {
+      latch.countDown
+    }
+    latch.await
+  }
+
+  def await(timeout:Long, unit:TimeUnit) = {
+    val latch = new CountDownLatch(1)
+    callback {
+      latch.countDown
+    }
+    latch.await(timeout, unit)
+  }
+
+  override def toString = tasks.synchronized { "waiting on: "+tasks }
+}
+
 object BufferConversions {
 
   implicit def toAsciiBuffer(value:String) = new AsciiBuffer(value)
@@ -109,8 +167,11 @@ class Broker() extends Service with Disp
   @BeanProperty
   var defaultVirtualHost: VirtualHost = null
 
-  def start = runtime.start
-  def stop = runtime.stop
+  def start = runtime.start(null)
+  def start(onComplete:Runnable) = runtime.start(onComplete)
+
+  def stop = runtime.stop(null)
+  def stop(onComplete:Runnable) = runtime.stop(onComplete)
 
   val dispatchQueue = createQueue("broker");
   dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
@@ -148,9 +209,10 @@ class Broker() extends Service with Disp
         debug("Accepted connection from: %s", transport.getRemoteAddress)
         var connection = new BrokerConnection(Broker.this)
         connection.transport = transport
+        connection.dispatchQueue.retain
         clientConnections.add(connection)
         try {
-          connection.start
+          connection.start()
         }
         catch {
           case e1: Exception => {
@@ -161,7 +223,13 @@ class Broker() extends Service with Disp
     }
 
     var state = CONFIGURATION
-    val clientConnections: ArrayList[Connection] = new ArrayList[Connection]
+    val clientConnections: HashSet[Connection] = new HashSet[Connection]
+
+    def stopped(connection:Connection) = ^{
+      if( clientConnections.remove(connection) ) {
+        connection.dispatchQueue.release
+      }
+    } ->: dispatchQueue
 
     def removeConnectUri(uri: String): Unit = ^ {
       connectUris.remove(uri)
@@ -202,7 +270,11 @@ class Broker() extends Service with Disp
       new ArrayList[TransportServer](transportServers)
     } ->: dispatchQueue
 
-    def start = ^ {
+    def start(onCompleted:Runnable) = ^ {
+      _start(onCompleted)
+    } ->: dispatchQueue
+
+    def _start(onCompleted:Runnable) = {
       if (state == CONFIGURATION) {
         // We can apply defaults now
         if (dataDirectory == null) {
@@ -218,23 +290,37 @@ class Broker() extends Service with Disp
 
         state = STARTING
 
+        val tracker = new CompletionTracker(dispatchQueue)
         for (virtualHost <- virtualHosts.values) {
-          virtualHost.start
+          virtualHost.start(tracker.task("virtual host: "+virtualHost))
         }
         for (server <- transportServers) {
           server.setDispatchQueue(dispatchQueue)
           server.setAcceptListener(new BrokerAcceptListener)
-          server.start
+          server.start(tracker.task("transport server: "+server))
         }
-        state = RUNNING
+        tracker.callback {
+          state = RUNNING
+          if( onCompleted!=null ) {
+            onCompleted.run
+          }
+        }
+
       } else {
         warn("Can only start a broker that is in the " + CONFIGURATION + " state.  Broker was " + state)
       }
-    } ->: dispatchQueue
+    }
+
 
-    def stop: Unit = ^ {
+    def stop(onCompleted:Runnable): Unit = ^ {
       if (state == RUNNING) {
         state = STOPPING
+        dispatchQueue.setDisposer(^{
+          if( onCompleted!=null ) {
+            state = STOPPED;
+            onCompleted.run
+          }
+        })
 
         for (server <- transportServers) {
           stopService(server)
@@ -245,9 +331,8 @@ class Broker() extends Service with Disp
         for (virtualHost <- virtualHosts.values) {
           stopService(virtualHost)
         }
-        state = STOPPED;
+        dispatchQueue.release
       }
-
     } ->: dispatchQueue
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:46:57 2010
@@ -44,19 +44,25 @@ abstract class Connection() extends Tran
   
   var name = "connection"
   var stopped = false;
-
   var transport:Transport = null
 
-  def start() = {
+  def start() = start(null)
+
+  def start(onCompleted:Runnable) = {
     transport.setDispatchQueue(dispatchQueue);
     transport.setTransportListener(Connection.this);
-    transport.start()
+    transport.start(onCompleted)
   }
 
-  def stop() = {
-    stopped=true
-    transport.stop()
-    dispatchQueue.release
+  def stop() = stop(null)
+
+  def stop(onCompleted:Runnable) = {
+    if( !stopped ) {
+      stopped=true
+      transport.stop()
+      dispatchQueue.setDisposer(onCompleted)
+      dispatchQueue.release
+    }
   }
 
   def onTransportFailure(error:IOException) = {
@@ -83,10 +89,19 @@ class BrokerConnection(val broker: Broke
   var protocol = "stomp"
   var protocolHandler: ProtocolHandler = null;
 
-  override def start() = {
+  override def start(onCompleted:Runnable) = {
+    broker.dispatchQueue.retain
     protocolHandler = ProtocolHandlerFactory.createProtocolHandler(protocol)
     protocolHandler.setConnection(this);
-    super.start
+    super.start(onCompleted)
+  }
+
+  override def stop(onCompleted:Runnable) = {
+    if( !stopped ) {
+      broker.runtime.stopped(this)
+      broker.dispatchQueue.release
+      super.stop(onCompleted)
+    }
   }
 
   override def onTransportConnected() = protocolHandler.onTransportConnected

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul  7 03:46:57 2010
@@ -26,6 +26,11 @@ trait Log {
   val log = LogFactory.getLog(getClass.getName)
 }
 
+class NamedLog(name:String) extends Log {
+  def this(clazz:Class[_]) = this(clazz.getName)
+  override val log = LogFactory.getLog(name)
+}
+
 object Logging {
   val exception_id_generator = new AtomicLong(System.currentTimeMillis)
   def next_exception_id = exception_id_generator.incrementAndGet.toHexString
@@ -37,7 +42,7 @@ object Logging {
 trait Logging {
 
   import Logging._
-  protected def log: Log
+  protected def log: Log = new NamedLog(getClass)
   protected def log_map(message:String) = message
 
   protected def error(message: => String, args:Any*): Unit = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 03:46:57 2010
@@ -54,23 +54,26 @@ class VirtualHost() extends Service with
   @BeanProperty
   var transactionManager:TransactionManager = new TransactionManager
 
+  override def toString = names.head
 
-  def start():Unit = {
-      if (started) {
-          return;
-      }
 
-      database.virtualHost = this
-      database.start();
+  def start() = start(null)
+  def start(onCompleted:Runnable):Unit = {
+    if (started) {
+        return;
+    }
+
+    database.virtualHost = this
+    database.start();
 
 //      router.setDatabase(database);
 
-      //Recover queues:
-      queueStore.setDatabase(database);
-      queueStore.setDispatchQueue(q);
-      queueStore.loadQueues();
+    //Recover queues:
+    queueStore.setDatabase(database);
+    queueStore.setDispatchQueue(q);
+    queueStore.loadQueues();
 
-      // Create Queue instances
+    // Create Queue instances
 //        TODO:
 //        for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
 //            Queue queue = new Queue(iQueue);
@@ -84,15 +87,19 @@ class VirtualHost() extends Service with
 //            queue.start();
 //        }
 
-      //Recover transactions:
-      transactionManager.virtualHost = this
-      transactionManager.loadTransactions();
-      started = true;
+    //Recover transactions:
+    transactionManager.virtualHost = this
+    transactionManager.loadTransactions();
+    started = true;
+    
+    if( onCompleted!=null ) {
+      onCompleted.run
+    }
   }
 
 
-
-  def stop():Unit = {
+  def stop() = start(null)
+  def stop(onCompleted:Runnable):Unit = {
 
       if (!started) {
           return;
@@ -211,7 +218,7 @@ class VirtualHost() extends Service with
   }
 }
 
-class BrokerDatabase() extends Service {
+class BrokerDatabase() {
 
   @BeanProperty
   var store:Store=new MemoryStore;

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (from r961082, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala&r1=961082&r2=961083&rev=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul  7 03:46:57 2010
@@ -25,126 +25,18 @@ import org.apache.activemq.transport.Tra
 
 import _root_.scala.collection.JavaConversions._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.apache.activemq.apollo.broker._
 import org.apache.activemq.util.buffer.AsciiBuffer
 import org.apache.activemq.broker.store.{Store, StoreFactory}
 import java.io.{File, IOException}
-import java.util.concurrent.TimeUnit
 import java.util.ArrayList
+import org.scalatest.Informer
+import org.fusesource.hawtdispatch.BaseRetained
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import org.apache.activemq.apollo.broker._
 
 
-abstract class RemoteConsumer extends Connection {
-  val consumerRate = new MetricCounter();
-  var totalConsumerRate: MetricAggregator = null
-  var thinkTime: Long = 0
-  var destination: Destination = null
-  var selector: String = null;
-  var durable = false;
-  var uri: String = null
-  var brokerPerfTest:BaseBrokerPerfTest = null
-
-  override def start() = {
-    consumerRate.name("Consumer " + name + " Rate");
-    totalConsumerRate.add(consumerRate);
-    transport = TransportFactory.connect(uri);
-    super.start();
-  }
-
-
-  override def onTransportConnected() = {
-    setupSubscription();
-    transport.resumeRead
-  }
-
-  override def onTransportFailure(error: IOException) = {
-    if (!brokerPerfTest.stopping.get()) {
-      System.err.println("Client Async Error:");
-      error.printStackTrace();
-    }
-  }
-
-  protected def setupSubscription()
-
-}
-
-
-abstract class RemoteProducer extends Connection {
-  val rate = new MetricCounter();
-
-  var messageIdGenerator: AtomicLong = null
-  var priority = 0
-  var persistentDelivery = false
-  var priorityMod = 0
-  var counter = 0
-  var producerId = 0
-  var destination: Destination = null
-  var property: String = null
-  var totalProducerRate: MetricAggregator = null
-  var next: Delivery = null
-  var thinkTime: Long = 0
-
-  var filler: String = null
-  var payloadSize = 20
-  var uri: String = null
-  var brokerPerfTest:BaseBrokerPerfTest = null
-
-  override def onTransportFailure(error: IOException) = {
-    if (!brokerPerfTest.stopping.get()) {
-      System.err.println("Client Async Error:");
-      error.printStackTrace();
-    }
-  }
-
-  override def start() = {
-
-    if (payloadSize > 0) {
-      var sb = new StringBuilder(payloadSize);
-      for (i <- 0 until payloadSize) {
-        sb.append(('a' + (i % 26)).toChar);
-      }
-      filler = sb.toString();
-    }
-
-    rate.name("Producer " + name + " Rate");
-    totalProducerRate.add(rate);
-
-    transport = TransportFactory.connect(uri);
-    super.start();
-
-  }
-
-  override def onTransportConnected() = {
-    setupProducer();
-    transport.resumeRead
-  }
-
-  def setupProducer()
-
-def createPayload(): String = {
-    if (payloadSize >= 0) {
-      var sb = new StringBuilder(payloadSize);
-      sb.append(name);
-      sb.append(':');
-      counter += 1
-      sb.append(counter);
-      sb.append(':');
-      var length = sb.length;
-      if (length <= payloadSize) {
-        sb.append(filler.subSequence(0, payloadSize - length));
-        return sb.toString();
-      } else {
-        return sb.substring(0, payloadSize);
-      }
-    } else {
-      counter += 1
-      return name + ":" + (counter);
-    }
-  }
-
-}
-
-object BaseBrokerPerfTest {
-  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3000000"))
+object BaseBrokerPerfSupport {
+  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
   var IO_WORK_AMOUNT = 0
   var FANIN_COUNT = 10
   var FANOUT_COUNT = 10
@@ -157,8 +49,9 @@ object BaseBrokerPerfTest {
   var DURABLE = false;
 
 }
-abstract class BaseBrokerPerfTest {
-  import BaseBrokerPerfTest._
+
+abstract class BaseBrokerPerfSupport extends FunSuiteSupport {
+  import BaseBrokerPerfSupport._
 
   // Set to put senders and consumers on separate brokers.
   protected var multibroker = false;
@@ -191,10 +84,10 @@ abstract class BaseBrokerPerfTest {
 
   val producers = new ArrayList[RemoteProducer]()
   val consumers = new ArrayList[RemoteConsumer]()
-  var name: String = null;
 
-  @Before
-  def setUp() = {
+
+  override protected def beforeAll(configMap: Map[String, Any]) = {
+    super.beforeAll(configMap)
     if (tcp) {
       sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
       receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
@@ -214,41 +107,28 @@ abstract class BaseBrokerPerfTest {
     }
   }
 
-  def setName(name: String) = {
-    if (this.name == null) {
-      this.name = name;
-    }
-  }
-
-  def getName() = name
-
   def getBrokerWireFormat() = "multi"
 
   def getRemoteWireFormat(): String
 
-  @Test
-  def benchmark_1_1_0(): Unit = {
-    setName("1 producer -> 1 destination -> 0 consumers");
-    if (ptp) {
-      return;
-    }
-    producerCount = 1;
-    destCount = 1;
-
-    createConnections();
-
-    // Start 'em up.
-    startClients();
-    try {
-      reportRates();
-    } finally {
-      stopServices();
+  if (!ptp) {
+    test("1 producer -> 1 destination -> 0 consumers") {
+      producerCount = 1;
+      destCount = 1;
+
+      createConnections();
+
+      // Start 'em up.
+      startClients();
+      try {
+        reportRates();
+      } finally {
+        stopServices();
+      }
     }
   }
 
-  @Test
-  def benchmark_1_1_1() = {
-    setName("1 producer -> 1 destination -> 1 consumers");
+  test("1 producer -> 1 destination -> 1 consumers") {
     producerCount = 1;
     destCount = 1;
     consumerCount = 1;
@@ -265,9 +145,7 @@ abstract class BaseBrokerPerfTest {
     }
   }
 
-  @Test
-  def benchmark_10_1_10() = {
-    setName(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT));
+  test(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT)) {
     producerCount = FANIN_COUNT;
     consumerCount = FANOUT_COUNT;
     destCount = 1;
@@ -283,9 +161,7 @@ abstract class BaseBrokerPerfTest {
     }
   }
 
-  @Test
-  def benchmark_10_1_1() = {
-    setName(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT));
+  test(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT)) {
     producerCount = FANIN_COUNT;
     destCount = 1;
     consumerCount = 1;
@@ -301,9 +177,7 @@ abstract class BaseBrokerPerfTest {
     }
   }
 
-  @Test
-  def benchmark_1_1_10() = {
-    setName(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT));
+  test(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT)) {
     producerCount = 1;
     destCount = 1;
     consumerCount = FANOUT_COUNT;
@@ -319,9 +193,7 @@ abstract class BaseBrokerPerfTest {
     }
   }
 
-  @Test
-  def benchmark_2_2_2() = {
-    setName(format("2 producer -> 2 destination -> 2 consumers"));
+  test("2 producer -> 2 destination -> 2 consumers") {
     producerCount = 2;
     destCount = 2;
     consumerCount = 2;
@@ -337,9 +209,7 @@ abstract class BaseBrokerPerfTest {
     }
   }
 
-  @Test
-  def benchmark_10_10_10() = {
-    setName(format("10 producers -> 10 destinations -> 10 consumers"));
+  test("10 producers -> 10 destinations -> 10 consumers") {
     producerCount = 10;
     destCount = 10;
     consumerCount = 10;
@@ -362,9 +232,7 @@ abstract class BaseBrokerPerfTest {
    *
    * @throws Exception
    */
-  @Test
-  def benchmark_2_2_2_SlowConsumer() = {
-    setName(format("2 producer -> 2 destination -> 2 slow consumers"));
+  test("2 producer -> 2 destination -> 2 slow consumers") {
     producerCount = 2;
     destCount = 2;
     consumerCount = 2;
@@ -381,9 +249,7 @@ abstract class BaseBrokerPerfTest {
     }
   }
 
-  @Test
-  def benchmark_2_2_2_Selector() = {
-    setName(format("2 producer -> 2 destination -> 2 selector consumers"));
+  test("2 producer -> 2 destination -> 2 selector consumers") {
     producerCount = 2;
     destCount = 2;
     consumerCount = 2;
@@ -412,10 +278,7 @@ abstract class BaseBrokerPerfTest {
    *
    * @throws Exception
    */
-  @Test
-  def benchmark_2_1_1_HighPriorityProducer() = {
-
-    setName(format("1 high and 1 normal priority producer -> 1 destination -> 1 consumer"));
+  test("1 high and 1 normal priority producer -> 1 destination -> 1 consumer") {
     producerCount = 2;
     destCount = 1;
     consumerCount = 1;
@@ -430,14 +293,13 @@ abstract class BaseBrokerPerfTest {
     // Start 'em up.
     startClients();
     try {
-
-      System.out.println("Checking rates for test: " + getName());
+      println("Checking rates...");
       for (i <- 0 until PERFORMANCE_SAMPLES) {
         var p = new Period();
         Thread.sleep(1000 * 5);
-        System.out.println(producer.rate.getRateSummary(p));
-        System.out.println(totalProducerRate.getRateSummary(p));
-        System.out.println(totalConsumerRate.getRateSummary(p));
+        println(producer.rate.getRateSummary(p));
+        println(totalProducerRate.getRateSummary(p));
+        println(totalConsumerRate.getRateSummary(p));
         totalProducerRate.reset();
         totalConsumerRate.reset();
       }
@@ -453,10 +315,7 @@ abstract class BaseBrokerPerfTest {
    *
    * @throws Exception
    */
-  @Test
-  def benchmark_2_1_1_MixedHighPriorityProducer() = {
-
-    setName(format("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer"));
+  test("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer") {
     producerCount = 2;
     destCount = 1;
     consumerCount = 1;
@@ -473,13 +332,13 @@ abstract class BaseBrokerPerfTest {
     startClients();
     try {
 
-      System.out.println("Checking rates for test: " + getName());
+      println("Checking rates...");
       for (i <- 0 until PERFORMANCE_SAMPLES) {
         var p = new Period();
         Thread.sleep(1000 * 5);
-        System.out.println(producer.rate.getRateSummary(p));
-        System.out.println(totalProducerRate.getRateSummary(p));
-        System.out.println(totalConsumerRate.getRateSummary(p));
+        println(producer.rate.getRateSummary(p));
+        println(totalProducerRate.getRateSummary(p));
+        println(totalConsumerRate.getRateSummary(p));
         totalProducerRate.reset();
         totalConsumerRate.reset();
       }
@@ -490,12 +349,12 @@ abstract class BaseBrokerPerfTest {
   }
 
   def reportRates() = {
-    System.out.println("Checking rates for test: " + getName() + ", " + (if (ptp) {"ptp"} else {"topic"}));
+    println("Checking "+(if (ptp) "ptp" else "topic")+" rates...");
     for (i <- 0 until PERFORMANCE_SAMPLES) {
       var p = new Period();
       Thread.sleep(1000 * 5);
-      System.out.println(totalProducerRate.getRateSummary(p));
-      System.out.println(totalConsumerRate.getRateSummary(p));
+      println(totalProducerRate.getRateSummary(p));
+      println(totalConsumerRate.getRateSummary(p));
       totalProducerRate.reset();
       totalConsumerRate.reset();
     }
@@ -601,35 +460,154 @@ abstract class BaseBrokerPerfTest {
 
   private def stopServices() = {
     stopping.set(true);
+    val tracker = new CompletionTracker
     for (broker <- brokers) {
-      broker.stop();
+      broker.stop(tracker.task());
     }
+    brokers.clear
     for (connection <- producers) {
-      connection.stop();
+      connection.stop(tracker.task());
     }
+    producers.clear
     for (connection <- consumers) {
-      connection.stop();
+      connection.stop(tracker.task());
     }
+    consumers.clear
+    println("waiting for services to stop");
+    tracker.await
+    stopping.set(false)
   }
 
   private def startBrokers() = {
+    val tracker = new CompletionTracker
     for (broker <- brokers) {
-      broker.start();
+      broker.start(tracker.task());
     }
+    tracker.await
   }
 
+
   private def startClients() = {
-    // Start the clients after a delay to give the server a chance to startup.
-    getGlobalQueue.dispatchAfter(200, TimeUnit.MILLISECONDS, ^{
-      for (connection <- consumers) {
-        connection.start();
+    var tracker = new CompletionTracker
+    for (connection <- consumers) {
+      connection.start(tracker.task());
+    }
+    tracker.await
+    tracker = new CompletionTracker
+    for (connection <- producers) {
+      connection.start(tracker.task());
+    }
+    tracker.await
+  }
+
+}
+
+abstract class RemoteConsumer extends Connection {
+  val consumerRate = new MetricCounter();
+  var totalConsumerRate: MetricAggregator = null
+  var thinkTime: Long = 0
+  var destination: Destination = null
+  var selector: String = null;
+  var durable = false;
+  var uri: String = null
+  var brokerPerfTest:BaseBrokerPerfSupport = null
+
+  override def start(onComplete:Runnable) = {
+    consumerRate.name("Consumer " + name + " Rate");
+    totalConsumerRate.add(consumerRate);
+    transport = TransportFactory.connect(uri);
+    super.start(onComplete);
+  }
+
+
+  override def onTransportConnected() = {
+    setupSubscription();
+    transport.resumeRead
+  }
+
+  override def onTransportFailure(error: IOException) = {
+    if (!brokerPerfTest.stopping.get()) {
+      System.err.println("Client Async Error:");
+      error.printStackTrace();
+    }
+  }
+
+  protected def setupSubscription()
+
+}
+
+
+abstract class RemoteProducer extends Connection {
+  val rate = new MetricCounter();
+
+  var messageIdGenerator: AtomicLong = null
+  var priority = 0
+  var persistentDelivery = false
+  var priorityMod = 0
+  var counter = 0
+  var producerId = 0
+  var destination: Destination = null
+  var property: String = null
+  var totalProducerRate: MetricAggregator = null
+  var next: Delivery = null
+  var thinkTime: Long = 0
+
+  var filler: String = null
+  var payloadSize = 20
+  var uri: String = null
+  var brokerPerfTest:BaseBrokerPerfSupport = null
+
+  override def onTransportFailure(error: IOException) = {
+    if (!brokerPerfTest.stopping.get()) {
+      System.err.println("Client Async Error:");
+      error.printStackTrace();
+    }
+  }
+
+  override def start(onComplete:Runnable) = {
+
+    if (payloadSize > 0) {
+      var sb = new StringBuilder(payloadSize);
+      for (i <- 0 until payloadSize) {
+        sb.append(('a' + (i % 26)).toChar);
       }
-    })
-    getGlobalQueue.dispatchAfter(400, TimeUnit.MILLISECONDS, ^{
-      for (connection <- producers) {
-        connection.start();
+      filler = sb.toString();
+    }
+
+    rate.name("Producer " + name + " Rate");
+    totalProducerRate.add(rate);
+
+    transport = TransportFactory.connect(uri);
+    super.start(onComplete);
+
+  }
+
+  override def onTransportConnected() = {
+    setupProducer();
+    transport.resumeRead
+  }
+
+  def setupProducer()
+
+def createPayload(): String = {
+    if (payloadSize >= 0) {
+      var sb = new StringBuilder(payloadSize);
+      sb.append(name);
+      sb.append(':');
+      counter += 1
+      sb.append(counter);
+      sb.append(':');
+      var length = sb.length;
+      if (length <= payloadSize) {
+        sb.append(filler.subSequence(0, payloadSize - length));
+        return sb.toString();
+      } else {
+        return sb.substring(0, payloadSize);
       }
-    })
+    } else {
+      counter += 1
+      return name + ":" + (counter);
+    }
   }
 
-}
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala?rev=961083&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala Wed Jul  7 03:46:57 2010
@@ -0,0 +1,33 @@
+package org.apache.activemq.apollo.broker.perf
+
+import _root_.org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import java.io.File
+import java.lang.String
+import collection.immutable.Map
+import org.apache.activemq.apollo.broker.Logging
+
+/**
+ * @version $Revision : 1.1 $
+ */
+@RunWith(classOf[JUnitRunner])
+abstract class FunSuiteSupport extends FunSuite with Logging with BeforeAndAfterAll {
+  protected var _basedir = "."
+
+  /**
+   * Returns the base directory of the current project
+   */
+  def baseDir = {
+    new File(_basedir)
+  }
+
+
+  override protected def beforeAll(map: Map[String, Any]): Unit = {
+    _basedir = map.get("basedir") match {
+      case Some(basedir) => basedir.toString
+      case _ => System.getProperty("basedir", ".")
+    }
+    debug("using basedir: " + _basedir)
+  }
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Wed Jul  7 03:46:57 2010
@@ -128,12 +128,20 @@ public class KahaDBStore implements Stor
         if (started.compareAndSet(false, true)) {
             try {
                 load();
+
             } catch (Exception e) {
                 LOG.error("Error loading store", e);
             }
         }
     }
 
+    public void start(Runnable onComplete) throws Exception {
+        start();
+        if( onComplete!=null ) {
+            onComplete.run();
+        }
+    }
+
     public void stop() throws Exception {
         if (started.compareAndSet(true, false)) {
             flush();
@@ -141,6 +149,13 @@ public class KahaDBStore implements Stor
         }
     }
 
+    public void stop(Runnable onComplete) throws Exception {
+        stop();
+        if( onComplete!=null ) {
+            onComplete.run();
+        }
+    }
+
     /**
      * @return a unique sequential store tracking number.
      */

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml Wed Jul  7 03:46:57 2010
@@ -87,6 +87,12 @@
       <version>${junit-version}</version>
     </dependency>
     <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest</artifactId>
+      <version>${scalatest-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <!--<scope>test</scope>-->

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 03:46:57 2010
@@ -30,14 +30,7 @@ import _root_.org.apache.activemq.apollo
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 import org.fusesource.hawtdispatch.BaseRetained
 
-object StompBrokerPerfTest {
-  def main(args:Array[String]) = {
-    val test = new StompBrokerPerfTest();
-    test.setUp
-    test.benchmark_1_1_1
-  }
-}
-class StompBrokerPerfTest extends BaseBrokerPerfTest {
+class StompBrokerPerfTest extends BaseBrokerPerfSupport {
 
     override def createProducer() =  new StompRemoteProducer()
     override def createConsumer() = new StompRemoteConsumer()

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Wed Jul  7 03:46:57 2010
@@ -641,9 +641,21 @@ public class MemoryStore implements Stor
     public void start() throws Exception {
     }
 
+    public void start(Runnable onComplete) throws Exception {
+        if( onComplete!=null ) {
+            onComplete.run();
+        }
+    }
+
     public void stop() throws Exception {
     }
 
+    public void stop(Runnable onComplete) throws Exception {
+        if( onComplete!=null ) {
+            onComplete.run();
+        }
+    }
+
     public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable runnable) throws T {
         R rc = callback.execute(session);
         if (runnable != null) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:46:57 2010
@@ -119,6 +119,9 @@ public class TcpTransport implements Tra
     }
 
     public void start() throws Exception {
+        start(null);
+    }
+    public void start(Runnable onCompleted) throws Exception {
         if (dispatchQueue == null) {
             throw new IllegalArgumentException("dispatchQueue is not set");
         }
@@ -169,6 +172,10 @@ public class TcpTransport implements Tra
         } else {
             fireConnected();
         }
+        if( onCompleted!=null ) {
+            dispatchQueue.execute(onCompleted);
+        }
+
     }
 
 
@@ -232,11 +239,15 @@ public class TcpTransport implements Tra
 
 
     public void stop() throws Exception {
+        stop(null);
+    }
+    public void stop(Runnable onCompleted) throws Exception {
         if (transportState != RUNNING) {
             throw new IllegalStateException("stop can only be used from the started state");
         }
         transportState = DISPOSED;
         readSource.cancel();
+        writeSource.setDisposer(onCompleted);
         writeSource.cancel();
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul  7 03:46:57 2010
@@ -84,7 +84,10 @@ public class TcpTransportServer implemen
         acceptSource.resume();
     }
 
-    public void start() throws IOException {
+    public void start() throws Exception {
+        start(null);
+    }
+    public void start(Runnable onCompleted) throws IOException {
         URI bind = bindURI;
 
         String host = bind.getHost();
@@ -138,6 +141,9 @@ public class TcpTransportServer implemen
             }
         });
         acceptSource.resume();
+        if( onCompleted!=null ) {
+            dispatchQueue.execute(onCompleted);
+        }
     }
 
     private URI connectURI(String hostname) throws URISyntaxException {
@@ -160,6 +166,10 @@ public class TcpTransportServer implemen
     }
 
     public void stop() throws Exception {
+        stop(null);
+    }
+    public void stop(Runnable onCompleted) throws Exception {
+        acceptSource.setDisposer(onCompleted);
         acceptSource.release();
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul  7 03:46:57 2010
@@ -90,6 +90,10 @@ public class TransportFilter implements 
         next.start();
     }
 
+    public void start(Runnable onComplete) throws Exception {
+        next.start(onComplete);
+    }
+
     /**
      * @see org.apache.activemq.Service#stop()
      */
@@ -97,6 +101,10 @@ public class TransportFilter implements 
         next.stop();
     }
 
+    public void stop(Runnable onComplete) throws Exception {
+        next.stop(onComplete);
+    }
+
     public void onTransportCommand(Object command) {
         transportListener.onTransportCommand(command);
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul  7 03:46:57 2010
@@ -67,6 +67,9 @@ public class PipeTransport implements Tr
     }
 
     public void start() throws Exception {
+        start(null);
+    }
+    public void start(final Runnable onCompleted) throws Exception {
         if (dispatchQueue == null) {
             throw new IllegalArgumentException("dispatchQueue is not set");
         }
@@ -107,6 +110,10 @@ public class PipeTransport implements Tr
                     fireConnected();
                     peer.fireConnected();
                 }
+                if( onCompleted!=null ) {
+                    onCompleted.run();
+                }
+
             }
         });
     }
@@ -123,10 +130,14 @@ public class PipeTransport implements Tr
     }
 
     public void stop() throws Exception {
+        stop(null);
+    }
+    public void stop(Runnable onCompleted) throws Exception {
         if( connected ) {
             peer.dispatchSource.merge(EOF_TOKEN);
         }
         if( dispatchSource!=null ) {
+            dispatchSource.setDisposer(onCompleted);
             dispatchSource.release();
             dispatchSource = null;
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java Wed Jul  7 03:46:57 2010
@@ -75,6 +75,9 @@ public class PipeTransportServer impleme
     }
 
     public void start() throws Exception {
+        start(null);
+    }
+    public void start(Runnable onCompleted) throws Exception {
         acceptSource = Dispatch.createSource(EventAggregators.<PipeTransport>linkedList(), dispatchQueue);
         acceptSource.setEventHandler(new Runnable() {
             public void run() {
@@ -84,11 +87,18 @@ public class PipeTransportServer impleme
                 }
             }
         });
+        if( onCompleted!=null ) {
+            dispatchQueue.execute(onCompleted);
+        }
     }
 
     public void stop() throws Exception {
-        acceptSource.release();
+        stop(null);
+    }
+    public void stop(Runnable onCompleted) throws Exception {
         PipeTransportFactory.unbind(this);
+        acceptSource.setDisposer(onCompleted);
+        acceptSource.release();
     }
 
     public void setConnectURI(URI connectURI) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/Service.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/Service.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/Service.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/Service.java Wed Jul  7 03:46:57 2010
@@ -19,18 +19,35 @@ package org.apache.activemq;
 
 /**
  * The core lifecyle interface for ActiveMQ components.
- *  
- * If there was a standard way to do so, it'd be good to register this 
- * interface with Spring so it treats the start/stop methods as those of
- * {@link org.springframework.beans.factory.InitializingBean} 
- * and {@link org.springframework.beans.factory.DisposableBean}
- * 
+ *
  * @version $Revision: 1.1 $
  */
 public interface Service {
 
+    /**
+     * Starts the service.  No guarantee is given that the service has fully started
+     * by the time this method returns.
+     */
     void start() throws Exception;
-    
+
+    /**
+     * Starts the service.  Executes the onComplete runnable once the service has fully started up.
+     *
+     * @param onComplete my be set to null if not interested in a callback.
+     */
+    void start(Runnable onComplete) throws Exception;
+
+    /**
+     * Stops the service.  No guarantee is given that the service has fully stopped
+     * by the time this method returns.
+     */
     void stop() throws Exception;
-    
+
+    /**
+     * Stops the service.  Executes the onComplete runnable once the service has fully stopped.
+     *
+     * @param onComplete my be set to null if not interested in a callback.
+     */
+    void stop(Runnable onComplete) throws Exception;
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/pom.xml?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/pom.xml Wed Jul  7 03:46:57 2010
@@ -135,7 +135,6 @@
     <module>activemq-tcp</module>
     <module>activemq-hawtdb</module>
     <module>activemq-jaxb</module>
-    <module>activemq-scala</module>
     <module>activemq-stomp</module>
   </modules>