You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:47:37 UTC

svn commit: r961086 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/ activemq-broker/src/test/scala/org/apache/active...

Author: chirino
Date: Wed Jul  7 03:47:36 2010
New Revision: 961086

URL: http://svn.apache.org/viewvc?rev=961086&view=rev
Log:
mostly work related to lifecycle/cleanup

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/metric/MetricAggregator.java

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961086&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul  7 03:47:36 2010
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import org.apache.activemq.Service
+import org.fusesource.hawtdispatch.DispatchQueue
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+/**
+ * <p>
+ * The BaseService provides helpers for dealing async service state.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait BaseService extends Service {
+
+  sealed class State {
+    override def toString = getClass.getSimpleName
+  }
+
+  trait CallbackSupport {
+    var callbacks:List[Runnable] = Nil
+    def << (r:Runnable) = if(r!=null) { callbacks ::= r }
+    def done = callbacks.foreach(_.run)
+  }
+
+  case class CREATED extends State
+  case class STARTING extends State with CallbackSupport
+  case class STARTED extends State
+  case class STOPPING extends State with CallbackSupport
+  case class STOPPED extends State
+
+  val dispatchQueue:DispatchQueue
+
+  final def start() = start(null)
+  final def stop() = stop(null)
+
+  protected var _serviceState:State = CREATED()
+  protected def serviceState = _serviceState
+
+  private def error(msg:String) {
+    try {
+      throw new AssertionError(msg)
+    } catch {
+      case e:Exception =>
+      e.printStackTrace
+    }
+  }
+
+  final def start(onCompleted:Runnable) = ^{
+    def do_start = {
+      val state = STARTING()
+      state << onCompleted
+      _serviceState = state
+      _start(^{
+        _serviceState = STARTED()
+        state.done
+      })
+    }
+    def done = {
+      if( onCompleted!=null ) {
+        onCompleted.run
+      }
+    }
+    _serviceState match {
+      case x:CREATED =>
+        do_start
+      case x:STOPPED =>
+        do_start
+      case state:STARTING =>
+        state << onCompleted
+      case state:STARTED =>
+        done
+      case state =>
+        done
+        error("start should not be called from state: "+state);
+    }
+  } ->: dispatchQueue
+
+  final def stop(onCompleted:Runnable) = ^{
+    def done = {
+      if( onCompleted!=null ) {
+        onCompleted.run
+      }
+    }
+    _serviceState match {
+      case x:STARTED =>
+        val state = STOPPING()
+        state << onCompleted
+        _serviceState = state
+        _stop(^{
+          _serviceState = STOPPED()
+          state.done
+        })
+      case state:STOPPING =>
+        state << onCompleted
+      case state:STOPPED =>
+        done
+      case state =>
+        done
+        error("stop should not be called from state: "+state);
+    }
+  } ->: dispatchQueue
+
+  protected def _start(onCompleted:Runnable)
+  protected def _stop(onCompleted:Runnable)
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:47:36 2010
@@ -29,6 +29,13 @@ import org.fusesource.hawtdispatch.{Disp
 import java.util.{HashSet, LinkedList, LinkedHashMap, ArrayList}
 import java.util.concurrent.{TimeUnit, CountDownLatch}
 
+/**
+ * <p>
+ * The BrokerFactory creates Broker objects from a URI.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object BrokerFactory {
 
     val BROKER_FACTORY_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/");
@@ -66,62 +73,6 @@ object BrokerFactory {
 
 }
 
-class CompletionTracker(val queue:DispatchQueue=getGlobalQueue) {
-  private[this] val tasks = new HashSet[Runnable]()
-  private[this] var _callback:Runnable = null
-  queue.retain
-
-  def task(name:String="unknown"):Runnable = {
-    val rc = new Runnable() {
-      def run():Unit = {
-        tasks.synchronized {
-          if( tasks.remove(this) ) {
-            if( tasks.isEmpty && _callback!=null ) {
-              _callback ->: queue
-              queue.release
-            }
-          }
-        }
-      }
-      override def toString = name
-    }
-    tasks.synchronized {
-      if( _callback!=null ) {
-        throw new IllegalStateException("all tasks should be created before setting the callback");
-      }
-      tasks.add(rc)
-    }
-    return rc
-  }
-
-  def callback(handler: =>Unit ) {
-    tasks.synchronized {
-      _callback = handler _
-      if(  tasks.isEmpty ) {
-        _callback ->: queue
-        queue.release
-      }
-    }
-  }
-
-  def await() = {
-    val latch =new CountDownLatch(1)
-    callback {
-      latch.countDown
-    }
-    latch.await
-  }
-
-  def await(timeout:Long, unit:TimeUnit) = {
-    val latch = new CountDownLatch(1)
-    callback {
-      latch.countDown
-    }
-    latch.await(timeout, unit)
-  }
-
-  override def toString = tasks.synchronized { "waiting on: "+tasks }
-}
 
 object BufferConversions {
 
@@ -145,8 +96,19 @@ object BrokerConstants extends Log {
   val UNKNOWN = "UNKNOWN"
   
   val DEFAULT_VIRTUAL_HOST_NAME = new AsciiBuffer("default")
+
+  val STICK_ON_THREAD_QUEUES = true
 }
 
+/**
+ * <p>
+ * A Broker is parent object of all services assoicated with the serverside of
+ * a message passing system.  It keeps track of all running connections,
+ * virtual hosts and assoicated messaging destintations.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class Broker() extends Service with DispatchLogging {
   
   import BrokerConstants._
@@ -174,7 +136,10 @@ class Broker() extends Service with Disp
   def stop(onComplete:Runnable) = runtime.stop(onComplete)
 
   val dispatchQueue = createQueue("broker");
-  dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+
+  if( STICK_ON_THREAD_QUEUES ) {
+    dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+  }
 
   def addVirtualHost(host: VirtualHost) = {
     if (host.names.isEmpty) {
@@ -210,6 +175,10 @@ class Broker() extends Service with Disp
         var connection = new BrokerConnection(Broker.this)
         connection.transport = transport
         connection.dispatchQueue.retain
+        if( STICK_ON_THREAD_QUEUES ) {
+          connection.dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+        }
+
         clientConnections.add(connection)
         try {
           connection.start()
@@ -290,7 +259,7 @@ class Broker() extends Service with Disp
 
         state = STARTING
 
-        val tracker = new CompletionTracker(dispatchQueue)
+        val tracker = new CompletionTracker("broker startup", dispatchQueue)
         for (virtualHost <- virtualHosts.values) {
           virtualHost.start(tracker.task("virtual host: "+virtualHost))
         }
@@ -315,45 +284,61 @@ class Broker() extends Service with Disp
     def stop(onCompleted:Runnable): Unit = ^ {
       if (state == RUNNING) {
         state = STOPPING
-        dispatchQueue.setDisposer(^{
-          if( onCompleted!=null ) {
-            state = STOPPED;
-            onCompleted.run
-          }
-        })
+        val tracker = new CompletionTracker("broker shutdown", dispatchQueue)
 
+        // Stop accepting connections..
         for (server <- transportServers) {
-          stopService(server)
+          stopService(server,tracker)
         }
+
+        // Kill client connections..
         for (connection <- clientConnections) {
-          stopService(connection)
+          stopService(connection, tracker)
         }
+
+        // Shutdown the virtual host services
         for (virtualHost <- virtualHosts.values) {
-          stopService(virtualHost)
+          stopService(virtualHost, tracker)
         }
-        dispatchQueue.release
+
+        def stopped = {
+          state = STOPPED;
+
+        }
+
+        tracker.callback {
+          stopped
+          if( onCompleted!=null ) {
+            onCompleted.run
+          }
+        }
+
       }
     } ->: dispatchQueue
+    
   }
 
 
+
   /**
    * Helper method to help stop broker services and log error if they fail to start.
    * @param server
    */
-  private def stopService(server: Service): Unit = {
+  private def stopService(service: Service, tracker:CompletionTracker): Unit = {
     try {
-      server.stop
+      service.stop(tracker.task(service.toString))
     } catch {
       case e: Exception => {
-        warn("Could not stop " + server + ": " + e)
-        debug("Could not stop " + server + " due to: ", e)
+        warn(e, "Could not stop " + service + ": " + e)
       }
     }
   }
 }
 
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait QueueLifecyleListener {
 
     /**

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala?rev=961086&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala Wed Jul  7 03:47:36 2010
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.HashSet
+import org.fusesource.hawtdispatch.DispatchQueue
+import org.fusesource.hawtdispatch.ScalaDispatch._
+
+object CompletionTracker extends Log
+
+/**
+ * <p>
+ * A CompletionTracker is used to track multiple async processing tasks and
+ * call a callback once they all complete.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class CompletionTracker(val name:String, val parent:DispatchQueue=getGlobalQueue) extends Logging {
+
+  override protected def log = CompletionTracker
+
+  private[this] val tasks = new HashSet[Runnable]()
+  private[this] var _callback:Runnable = null
+  val queue = parent.createSerialQueue("tracker: "+name);
+
+  def task(name:String="unknown"):Runnable = {
+    val rc = new Runnable() {
+      def run = {
+        trace("completed task: %s", name)
+        remove(this)
+      }
+      override def toString = name 
+    }
+    ^ {
+      assert(_callback==null)
+      tasks.add(rc)
+    }  ->: queue
+    return rc
+  }
+
+  def callback(handler: =>Unit ) {
+    var start = System.currentTimeMillis
+    ^ {
+      _callback = handler _
+      checkDone()
+    }  ->: queue
+
+    def displayStatus = {
+      if( _callback!=null ) {
+        val duration = (System.currentTimeMillis-start)/1000
+        info("%s is taking a long time (%d seconds). Waiting on %s", name, duration, tasks)
+        schedualCheck
+      }
+    }
+    def schedualCheck:Unit = queue.dispatchAfter(1, TimeUnit.SECONDS, ^{displayStatus})
+    schedualCheck
+  }
+
+  private def remove(r:Runnable) = ^{
+    if( tasks.remove(r) ) {
+      checkDone()
+    }
+  } ->: queue
+
+  private def checkDone() = {
+    if( tasks.isEmpty && _callback!=null ) {
+      trace("executing callback for %s", name)
+      _callback ->: queue
+      _callback = null
+      queue.release
+    } else {
+      if( _callback==null ) {
+        trace("still for callback to bet set")
+      }
+      if( _callback==null ) {
+        trace("still waiting for tasks %s", tasks)
+      }
+    }
+  }
+
+  def await() = {
+    val latch =new CountDownLatch(1)
+    callback {
+      latch.countDown
+    }
+    latch.await
+  }
+
+  def await(timeout:Long, unit:TimeUnit) = {
+    val latch = new CountDownLatch(1)
+    callback {
+      latch.countDown
+    }
+    latch.await(timeout, unit)
+  }
+
+  override def toString = tasks.synchronized { name+" waiting on: "+tasks }
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:47:36 2010
@@ -28,43 +28,45 @@ import _root_.org.fusesource.hawtdispatc
 import java.util.concurrent.atomic.AtomicLong
 import org.fusesource.hawtdispatch.Dispatch
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object Connection extends Log {
   val id_generator = new AtomicLong()
   def next_id = "connection:"+id_generator.incrementAndGet
 }
 
-abstract class Connection() extends TransportListener with Service  with DispatchLogging {
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class Connection() extends TransportListener with BaseService  with DispatchLogging {
 
   override protected def log = Connection
 
   import Connection._
   val id = next_id
   val dispatchQueue = createQueue(id)
-  dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
   
-  var name = "connection"
-  var stopped = false;
+  def stopped = serviceState match {
+    case STOPPED() | STOPPING() => true
+    case _ => false
+  }
+
   var transport:Transport = null
 
-  def start() = start(null)
+  override def toString = id
 
-  def start(onCompleted:Runnable) = {
+  override protected def _start(onCompleted:Runnable) = {
     transport.setDispatchQueue(dispatchQueue);
     transport.setTransportListener(Connection.this);
     transport.start(onCompleted)
   }
 
-  def stop() = stop(null)
-
-  def stop(onCompleted:Runnable) = {
-    if( !stopped ) {
-      stopped=true
-      transport.stop()
-      dispatchQueue.setDisposer(onCompleted)
-      dispatchQueue.release
-    }
+  override protected def _stop(onCompleted:Runnable) = {
+    transport.stop(onCompleted)
   }
 
+
   def onTransportFailure(error:IOException) = {
     if (!stopped) {
         onFailure(error);
@@ -84,23 +86,26 @@ abstract class Connection() extends Tran
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class BrokerConnection(val broker: Broker) extends Connection {
 
   var protocol = "stomp"
   var protocolHandler: ProtocolHandler = null;
 
-  override def start(onCompleted:Runnable) = {
+  override protected  def _start(onCompleted:Runnable) = {
     broker.dispatchQueue.retain
     protocolHandler = ProtocolHandlerFactory.createProtocolHandler(protocol)
     protocolHandler.setConnection(this);
-    super.start(onCompleted)
+    super._start(onCompleted)
   }
 
-  override def stop(onCompleted:Runnable) = {
+  override protected def _stop(onCompleted:Runnable) = {
     if( !stopped ) {
       broker.runtime.stopped(this)
       broker.dispatchQueue.release
-      super.stop(onCompleted)
+      super._stop(onCompleted)
     }
   }
 
@@ -120,8 +125,14 @@ class BrokerConnection(val broker: Broke
   override def onTransportFailure(error: IOException) = protocolHandler.onTransportFailure(error)
 }
 
-class ProtocolException(message:String, e:Throwable=null) extends Exception(message, e) 
-
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ProtocolException(message:String, e:Throwable=null) extends Exception(message, e)
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class MultiProtocolHandler extends ProtocolHandler {
 
   var connected = false
@@ -156,6 +167,9 @@ class MultiProtocolHandler extends Proto
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object ProtocolHandlerFactory {
     val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/protocol/");
 
@@ -164,6 +178,9 @@ object ProtocolHandlerFactory {
     }
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait ProtocolHandler extends TransportListener {
 
   var connection:BrokerConnection = null;
@@ -186,6 +203,9 @@ trait ProtocolHandler extends TransportL
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait ConsumerContext { // extends ClientContext, Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
 
     def getConsumerId() : String

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:47:36 2010
@@ -24,16 +24,25 @@ import _root_.org.fusesource.hawtdispatc
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 import org.apache.activemq.transport.Transport
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait DeliveryProducer {
   def collocate(queue:DispatchQueue):Unit
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait DeliverySession {
   val consumer:DeliveryConsumer
   def deliver(delivery:Delivery)
   def close:Unit
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait DeliveryConsumer extends Retained {
   def matches(message:Delivery)
   val queue:DispatchQueue;
@@ -43,6 +52,8 @@ trait DeliveryConsumer extends Retained 
 /**
  * Abstracts wire protocol message implementations.  Each wire protocol
  * will provide it's own type of Message.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait Message {
 
@@ -84,6 +95,9 @@ trait Message {
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object Delivery {
   def apply(o:Delivery) = new Delivery(o.message, o.size, o.encoded, o.encoding, o.ack, o.tx_id, o.store_id)
 }
@@ -385,11 +399,17 @@ case class Delivery (
 //    }
 //}
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait DeliverySink {
   def full:Boolean
   def send(delivery:Delivery):Unit
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class TransportDeliverySink(var transport:Transport) extends DeliverySink {
   def full:Boolean = transport.isFull
   def send(delivery:Delivery) = transport.oneway(delivery.message, delivery)
@@ -436,6 +456,9 @@ class DeliveryBuffer(var maxSize:Int=102
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class DeliveryOverflowBuffer(val delivery_buffer:DeliverySink) extends DeliverySink {
 
   private var overflow = new LinkedList[Delivery]()
@@ -472,6 +495,9 @@ class DeliveryOverflowBuffer(val deliver
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class DeliverySessionManager(val sink:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
 
   var sessions = List[SessionServer]()
@@ -516,12 +542,12 @@ class DeliverySessionManager(val sink:De
 
     class SessionClient() extends DeliveryOverflowBuffer(sink) {
 
-      producer_queue.retain
       val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
       credit_adder.setEventHandler(^{
         internal_credit(credit_adder.getData.intValue)
       });
       credit_adder.resume
+      source.retain
 
       private var credits = 0;
 
@@ -530,16 +556,18 @@ class DeliverySessionManager(val sink:De
       ///////////////////////////////////////////////////
       def close = {
         credit_adder.release
-        producer_queue.release
+        source.release
       }
 
       override def full = credits <= 0
 
       override protected def send_to_delivery_buffer(value:Delivery) = {
         var delivery = Delivery(value)
+        credit_adder.retain
         delivery.setDisposer(^{
           // This is called from the server/consumer thread
           credit_adder.merge(delivery.size);
+          credit_adder.release
         })
         internal_credit(-delivery.size)
         source.merge(delivery)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala Wed Jul  7 03:47:36 2010
@@ -19,6 +19,9 @@ package org.apache.activemq.apollo.broke
 import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
 import BufferConversions._
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class ParserOptions {
   var defaultDomain:AsciiBuffer = null
   var queuePrefix:AsciiBuffer = null
@@ -27,6 +30,9 @@ class ParserOptions {
   var tempTopicPrefix:AsciiBuffer = null
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object DestinationParser {
 
     /**
@@ -86,6 +92,9 @@ object DestinationParser {
     }
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 case class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
 
   def getDestinations():Array[Destination] = null;
@@ -94,6 +103,10 @@ case class SingleDestination(var domain:
 
   override def toString() = ""+domain+":"+name
 }
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 case class MultiDestination(var destinations:Array[Destination]) extends Destination {
 
   def getDestinations():Array[Destination] = destinations;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul  7 03:47:36 2010
@@ -22,15 +22,25 @@ import _root_.org.apache.commons.logging
 import _root_.org.apache.commons.logging.{Log => Logger}
 import java.util.concurrent.atomic.AtomicLong
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait Log {
-  val log = LogFactory.getLog(getClass.getName)
+  val log = LogFactory.getLog(getClass.getName.stripSuffix("$"))
+
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class NamedLog(name:String) extends Log {
-  def this(clazz:Class[_]) = this(clazz.getName)
+  def this(clazz:Class[_]) = this(clazz.getName.stripSuffix("$"))
   override val log = LogFactory.getLog(name)
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object Logging {
   val exception_id_generator = new AtomicLong(System.currentTimeMillis)
   def next_exception_id = exception_id_generator.incrementAndGet.toHexString
@@ -215,6 +225,9 @@ trait Logging {
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait DispatchLogging extends Logging {
   import org.fusesource.hawtdispatch.ScalaDispatch._
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 03:47:36 2010
@@ -25,6 +25,9 @@ import java.util.HashMap
 import collection.JavaConversions
 import path.PathMap
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object Domain {
   val TOPIC_DOMAIN = new AsciiBuffer("topic");
   val QUEUE_DOMAIN = new AsciiBuffer("queue");
@@ -33,6 +36,9 @@ object Domain {
 }
 
 import Domain._
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class Domain {
 
   val targets = new PathMap[DeliveryConsumer]();
@@ -52,6 +58,9 @@ class Domain {
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object Router extends Log {
 
 }
@@ -67,6 +76,7 @@ object Router extends Log {
  * get the current set of consumers that are bound
  * to the destination. 
  *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class Router(var queue:DispatchQueue) extends DispatchLogging {
 
@@ -182,6 +192,9 @@ class Router(var queue:DispatchQueue) ex
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait Route extends Retained {
 
   val destination:Destination
@@ -195,6 +208,9 @@ trait Route extends Retained {
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging {
 
   override protected def log = Router

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jul  7 03:47:36 2010
@@ -21,6 +21,9 @@ import _root_.org.apache.activemq.filter
 import _root_.scala.collection.JavaConversions._
 import path.PathFilter
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 trait BrokerSubscription {
 
     def connect(consumer:ConsumerContext)
@@ -32,6 +35,9 @@ trait BrokerSubscription {
 }
 
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class CompositeSubscription(val destination:Destination, val subscriptions:List[BrokerSubscription] ) extends BrokerSubscription {
 
 
@@ -51,12 +57,18 @@ class CompositeSubscription(val destinat
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object WildcardQueueSubscription extends Log {
 
 }
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class WildcardQueueSubscription(val host:VirtualHost, val destination:Destination, val consumer:ConsumerContext) extends BrokerSubscription with QueueLifecyleListener with Logging {
 
-  protected def log = WildcardQueueSubscription
+  override protected def log = WildcardQueueSubscription
 
     var filter = PathFilter.parseFilter(destination.getName());
     val childSubs = new ArrayList[BrokerSubscription]();
@@ -112,6 +124,9 @@ class WildcardQueueSubscription(val host
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class TopicSubscription { // extends BrokerSubscription with DeliveryTarget {
   def matches(message:Delivery) = false
   def deliver(message:Delivery) = {}
@@ -246,6 +261,9 @@ class TopicSubscription { // extends Bro
 
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class DurableSubscription(val host:VirtualHost, val destination:Destination, val selector:BooleanExpression) { // extends BrokerSubscription with DeliveryTarget {
 
 //    private final IQueue<Long, MessageDelivery> queue;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 03:47:36 2010
@@ -27,8 +27,14 @@ import _root_.scala.collection.JavaConve
 import _root_.scala.reflect.BeanProperty
 import path.PathFilter
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object VirtualHost extends Log
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class VirtualHost() extends Service with Logging {
 
   override protected def log = VirtualHost
@@ -121,6 +127,10 @@ class VirtualHost() extends Service with
 
       database.stop();
       started = false;
+    if( onCompleted!=null ) {
+      onCompleted.run
+    }
+    
   }
 
   def createQueue(dest:Destination) :Queue = {
@@ -218,6 +228,9 @@ class VirtualHost() extends Service with
   }
 }
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class BrokerDatabase() {
 
   @BeanProperty
@@ -1558,9 +1571,15 @@ class BrokerDatabase() {
 
 
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class UserAlreadyConnectedException extends Exception
 
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class BrokerQueueStore { // implements QueueStore<Long, MessageDelivery> {
 // TODO:
 //    private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul  7 03:47:36 2010
@@ -35,6 +35,9 @@ import _root_.org.apache.activemq.transp
 
 import _root_.scala.collection.JavaConversions._
 
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 object VMTransportFactory extends Log {
   val DEFAULT_PIPE_NAME = BrokerConstants.DEFAULT_VIRTUAL_HOST_NAME.toString();
 }
@@ -43,8 +46,7 @@ object VMTransportFactory extends Log {
  * Implements the vm transport which behaves like the pipe transport except that
  * it can start embedded brokers up on demand.
  *
- * @author chirino
- *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class VMTransportFactory extends PipeTransportFactory with Logging {
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul  7 03:47:36 2010
@@ -29,14 +29,14 @@ import org.apache.activemq.util.buffer.A
 import org.apache.activemq.broker.store.{Store, StoreFactory}
 import java.io.{File, IOException}
 import java.util.ArrayList
-import org.scalatest.Informer
 import org.fusesource.hawtdispatch.BaseRetained
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import org.apache.activemq.apollo.broker._
-
+import org.scalatest._
 
 object BaseBrokerPerfSupport {
-  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "1"))
+  var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
   var IO_WORK_AMOUNT = 0
   var FANIN_COUNT = 10
   var FANOUT_COUNT = 10
@@ -48,21 +48,22 @@ object BaseBrokerPerfSupport {
   var PERSISTENT = false;
   var DURABLE = false;
 
-}
-
-abstract class BaseBrokerPerfSupport extends FunSuiteSupport {
-  import BaseBrokerPerfSupport._
+  // Set to test against ptp queues instead of topics:
+  var PTP = false;
 
   // Set to put senders and consumers on separate brokers.
-  protected var multibroker = false;
-
-  // Set to mockup up ptp:
-  protected var ptp = false;
+  var MULTI_BROKER = false;
 
   // Set to use tcp IO
-  protected var tcp = true;
+  protected var TCP = true;
+
   // set to force marshalling even in the NON tcp case.
-  protected var forceMarshalling = true;
+  protected var FORCE_MARSHALLING = true;
+}
+
+abstract class BaseBrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach {
+  import BaseBrokerPerfSupport._
+
 
   protected var sendBrokerBindURI: String = null
   protected var receiveBrokerBindURI: String = null
@@ -85,10 +86,26 @@ abstract class BaseBrokerPerfSupport ext
   val producers = new ArrayList[RemoteProducer]()
   val consumers = new ArrayList[RemoteConsumer]()
 
+  var spread_sheet_stats:List[(String, Any)] = Nil
+
+
+  override protected def beforeEach() = {
+    totalProducerRate.removeAllMetrics
+    totalConsumerRate.removeAllMetrics
+    brokers.clear
+    producers.clear
+    consumers.clear
+    stopping.set(false)
+    rcvBroker=null
+    sendBroker=null
+    producerCount = 0
+    consumerCount = 0
+    destCount =0
+  }
 
   override protected def beforeAll(configMap: Map[String, Any]) = {
     super.beforeAll(configMap)
-    if (tcp) {
+    if (TCP) {
       sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
       receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
 
@@ -97,7 +114,7 @@ abstract class BaseBrokerPerfSupport ext
     } else {
       sendBrokerConnectURI = "pipe://SendBroker";
       receiveBrokerConnectURI = "pipe://ReceiveBroker";
-      if (forceMarshalling) {
+      if (FORCE_MARSHALLING) {
         sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
         receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
       } else {
@@ -107,12 +124,30 @@ abstract class BaseBrokerPerfSupport ext
     }
   }
 
+
+  override protected def afterEach() = {
+    println("Spread sheet stats:")
+    println(spread_sheet_stats.map(_._1).mkString(","))
+    println(spread_sheet_stats.map(_._2).mkString(","))
+  }
+
+  override protected def afterAll() = {
+    println("Spread sheet stats:")
+    println(spread_sheet_stats.map(_._1).mkString(","))
+    println(spread_sheet_stats.map(_._2).mkString(","))
+  }
+
   def getBrokerWireFormat() = "multi"
 
   def getRemoteWireFormat(): String
 
-  if (!ptp) {
-    test("1 producer -> 1 destination -> 0 consumers") {
+  /**
+   * Used to benchmark what is the raw speed of sending messages one way.
+   * Divide by 2 and compare against 1-1-1 to figure out what the broker dispatching
+   * overhead is.
+   */
+  if (!PTP) {
+    test("1->1->0") {
       producerCount = 1;
       destCount = 1;
 
@@ -128,14 +163,16 @@ abstract class BaseBrokerPerfSupport ext
     }
   }
 
-  test("1 producer -> 1 destination -> 1 consumers") {
+  /**
+   * The baseline of the performance of going from 1 producer to 1 consumer.
+   */
+  test("1->1->1") {
+    println(testName)
     producerCount = 1;
     destCount = 1;
     consumerCount = 1;
 
     createConnections();
-//    producers.get(0).thinkTime = 500000*1000;
-
     // Start 'em up.
     startClients();
     try {
@@ -145,10 +182,14 @@ abstract class BaseBrokerPerfSupport ext
     }
   }
 
-  test(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT)) {
-    producerCount = FANIN_COUNT;
-    consumerCount = FANOUT_COUNT;
-    destCount = 1;
+  /**
+   * To compare against the performance of the 1-1-1 case... If you have
+   * linear scalability then, this should be twice as fast.
+   */
+  test("2->2->2") {
+    producerCount = 2;
+    destCount = 2;
+    consumerCount = 2;
 
     createConnections();
 
@@ -161,10 +202,13 @@ abstract class BaseBrokerPerfSupport ext
     }
   }
 
-  test(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT)) {
+  /**
+   * To see how high producer and consumer contention on a destination performs.
+   */
+  test(format("%d->1->%d", FANIN_COUNT, FANOUT_COUNT)) {
     producerCount = FANIN_COUNT;
+    consumerCount = FANOUT_COUNT;
     destCount = 1;
-    consumerCount = 1;
 
     createConnections();
 
@@ -177,10 +221,13 @@ abstract class BaseBrokerPerfSupport ext
     }
   }
 
-  test(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT)) {
-    producerCount = 1;
+  /**
+   * To see how high producer contention on a destination performs.
+   */
+  test(format("%d->1->1", FANIN_COUNT)) {
+    producerCount = FANIN_COUNT;
     destCount = 1;
-    consumerCount = FANOUT_COUNT;
+    consumerCount = 1;
 
     createConnections();
 
@@ -193,10 +240,13 @@ abstract class BaseBrokerPerfSupport ext
     }
   }
 
-  test("2 producer -> 2 destination -> 2 consumers") {
-    producerCount = 2;
-    destCount = 2;
-    consumerCount = 2;
+  /**
+   * To see how high consumer contention on a destination performs.
+   */
+  test(format("1->1->%d", FANOUT_COUNT)) {
+    producerCount = 1;
+    destCount = 1;
+    consumerCount = FANOUT_COUNT;
 
     createConnections();
 
@@ -209,7 +259,11 @@ abstract class BaseBrokerPerfSupport ext
     }
   }
 
-  test("10 producers -> 10 destinations -> 10 consumers") {
+  /**
+   * To test how an overload situation affects scalability.  Compare to the
+   * scalability trend of 1-1-1 to 2-2-2
+   */
+  test("10->10->10") {
     producerCount = 10;
     destCount = 10;
     consumerCount = 10;
@@ -226,15 +280,18 @@ abstract class BaseBrokerPerfSupport ext
   }
 
   /**
-   * Tests 2 producers sending to 1 destination with 2 consumres, but with
-   * consumers set to select only messages from each producer. 1 consumers is
-   * set to slow, the other producer should be able to send quickly.
+   *  Tests 1 producers sending to 1 destination with 1 slow and 1 fast consumer.
+   *
+   * queue case: the producer should not slow down since it can dispatch to the
+   *             fast consumer
+   *
+   * topic case: the producer should slow down since it HAS to dispatch to the
+   *             slow consumer.
    *
-   * @throws Exception
    */
-  test("2 producer -> 2 destination -> 2 slow consumers") {
+  test("1->1->[1 slow,1 fast]") {
     producerCount = 2;
-    destCount = 2;
+    destCount = 1;
     consumerCount = 2;
 
     createConnections();
@@ -249,7 +306,7 @@ abstract class BaseBrokerPerfSupport ext
     }
   }
 
-  test("2 producer -> 2 destination -> 2 selector consumers") {
+  test("2->2->[1,1 selecting]") {
     producerCount = 2;
     destCount = 2;
     consumerCount = 2;
@@ -278,7 +335,7 @@ abstract class BaseBrokerPerfSupport ext
    *
    * @throws Exception
    */
-  test("1 high and 1 normal priority producer -> 1 destination -> 1 consumer") {
+  test("[1 high, 1 normal]->1->1") {
     producerCount = 2;
     destCount = 1;
     consumerCount = 1;
@@ -296,7 +353,7 @@ abstract class BaseBrokerPerfSupport ext
       println("Checking rates...");
       for (i <- 0 until PERFORMANCE_SAMPLES) {
         var p = new Period();
-        Thread.sleep(1000 * 5);
+        Thread.sleep(SAMPLE_PERIOD);
         println(producer.rate.getRateSummary(p));
         println(totalProducerRate.getRateSummary(p));
         println(totalConsumerRate.getRateSummary(p));
@@ -315,7 +372,7 @@ abstract class BaseBrokerPerfSupport ext
    *
    * @throws Exception
    */
-  test("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer") {
+  test("[1 high, 1 mixed, 1 normal]->1->1") {
     producerCount = 2;
     destCount = 1;
     consumerCount = 1;
@@ -335,7 +392,7 @@ abstract class BaseBrokerPerfSupport ext
       println("Checking rates...");
       for (i <- 0 until PERFORMANCE_SAMPLES) {
         var p = new Period();
-        Thread.sleep(1000 * 5);
+        Thread.sleep(SAMPLE_PERIOD);
         println(producer.rate.getRateSummary(p));
         println(totalProducerRate.getRateSummary(p));
         println(totalConsumerRate.getRateSummary(p));
@@ -349,20 +406,44 @@ abstract class BaseBrokerPerfSupport ext
   }
 
   def reportRates() = {
-    println("Checking "+(if (ptp) "ptp" else "topic")+" rates...");
+    val best_sample = PERFORMANCE_SAMPLES/2
+
+    println("Checking "+(if (PTP) "ptp" else "topic")+" rates...");
     for (i <- 0 until PERFORMANCE_SAMPLES) {
       var p = new Period();
-      Thread.sleep(1000 * 5);
-      println(totalProducerRate.getRateSummary(p));
-      println(totalConsumerRate.getRateSummary(p));
+      Thread.sleep(SAMPLE_PERIOD);
+      if( producerCount > 0 ) {
+        println(totalProducerRate.getRateSummary(p));
+      }
+      if( consumerCount > 0 ) {
+        println(totalConsumerRate.getRateSummary(p));
+      }
+
+      if( i == best_sample ) {
+        if( producerCount > 0 ) {
+          spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: producer rate", totalProducerRate.total(p) ) :: Nil
+          if( producerCount > 1 ) {
+            spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: producer deviation", totalProducerRate.deviation ) :: Nil
+          }
+        }
+        if( consumerCount > 0 ) {
+          spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: consumer rate", totalConsumerRate.total(p) ) :: Nil
+          if( consumerCount > 1 ) {
+            spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: consumer deviation", totalConsumerRate.deviation ) :: Nil
+          }
+        }
+      }
+      
       totalProducerRate.reset();
       totalConsumerRate.reset();
     }
+
+
   }
 
   def createConnections() = {
 
-    if (multibroker) {
+    if (MULTI_BROKER) {
       sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
       rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
       brokers.add(sendBroker);
@@ -378,13 +459,13 @@ abstract class BaseBrokerPerfSupport ext
     var dests = new Array[Destination](destCount);
 
     for (i <- 0 until destCount) {
-      val domain = if (ptp) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
+      val domain = if (PTP) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
       val name = new AsciiBuffer("dest" + (i + 1))
       var bean = new SingleDestination(domain, name)
       dests(i) = bean;
-      if (ptp) {
+      if (PTP) {
         sendBroker.defaultVirtualHost.createQueue(dests(i));
-        if (multibroker) {
+        if (MULTI_BROKER) {
           rcvBroker.defaultVirtualHost.createQueue(dests(i));
         }
       }
@@ -460,42 +541,38 @@ abstract class BaseBrokerPerfSupport ext
 
   private def stopServices() = {
     stopping.set(true);
-    val tracker = new CompletionTracker
+    val tracker = new CompletionTracker("test shutdown")
     for (broker <- brokers) {
-      broker.stop(tracker.task());
+      broker.stop(tracker.task("broker"));
     }
-    brokers.clear
     for (connection <- producers) {
-      connection.stop(tracker.task());
+      connection.stop(tracker.task(connection.toString));
     }
-    producers.clear
     for (connection <- consumers) {
-      connection.stop(tracker.task());
+      connection.stop(tracker.task(connection.toString));
     }
-    consumers.clear
     println("waiting for services to stop");
     tracker.await
-    stopping.set(false)
   }
 
   private def startBrokers() = {
-    val tracker = new CompletionTracker
+    val tracker = new CompletionTracker("test broker startup")
     for (broker <- brokers) {
-      broker.start(tracker.task());
+      broker.start(tracker.task("broker"));
     }
     tracker.await
   }
 
 
   private def startClients() = {
-    var tracker = new CompletionTracker
+    var tracker = new CompletionTracker("test consumer startup")
     for (connection <- consumers) {
-      connection.start(tracker.task());
+      connection.start(tracker.task(connection.toString));
     }
     tracker.await
-    tracker = new CompletionTracker
+    tracker = new CompletionTracker("test producer startup")
     for (connection <- producers) {
-      connection.start(tracker.task());
+      connection.start(tracker.task(connection.toString));
     }
     tracker.await
   }
@@ -510,13 +587,16 @@ abstract class RemoteConsumer extends Co
   var selector: String = null;
   var durable = false;
   var uri: String = null
+  var name:String = null
   var brokerPerfTest:BaseBrokerPerfSupport = null
 
-  override def start(onComplete:Runnable) = {
-    consumerRate.name("Consumer " + name + " Rate");
+  override protected def _start(onComplete:Runnable) = {
+    if( consumerRate.getName == null ) {
+      consumerRate.name("Consumer " + name + " Rate");
+    }
     totalConsumerRate.add(consumerRate);
     transport = TransportFactory.connect(uri);
-    super.start(onComplete);
+    super._start(onComplete);
   }
 
 
@@ -540,6 +620,7 @@ abstract class RemoteConsumer extends Co
 abstract class RemoteProducer extends Connection {
   val rate = new MetricCounter();
 
+  var name:String = null
   var messageIdGenerator: AtomicLong = null
   var priority = 0
   var persistentDelivery = false
@@ -564,7 +645,7 @@ abstract class RemoteProducer extends Co
     }
   }
 
-  override def start(onComplete:Runnable) = {
+  override protected def _start(onComplete:Runnable) = {
 
     if (payloadSize > 0) {
       var sb = new StringBuilder(payloadSize);
@@ -574,11 +655,13 @@ abstract class RemoteProducer extends Co
       filler = sb.toString();
     }
 
-    rate.name("Producer " + name + " Rate");
+    if( rate.getName == null ) {
+      rate.name("Producer " + name + " Rate");
+    }
     totalProducerRate.add(rate);
 
     transport = TransportFactory.connect(uri);
-    super.start(onComplete);
+    super._start(onComplete);
 
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala Wed Jul  7 03:47:36 2010
@@ -2,11 +2,11 @@ package org.apache.activemq.apollo.broke
 
 import _root_.org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
 import java.io.File
 import java.lang.String
 import collection.immutable.Map
 import org.apache.activemq.apollo.broker.Logging
+import org.scalatest._
 
 /**
  * @version $Revision : 1.1 $
@@ -15,6 +15,7 @@ import org.apache.activemq.apollo.broker
 abstract class FunSuiteSupport extends FunSuite with Logging with BeforeAndAfterAll {
   protected var _basedir = "."
 
+
   /**
    * Returns the base directory of the current project
    */
@@ -30,4 +31,22 @@ abstract class FunSuiteSupport extends F
     }
     debug("using basedir: " + _basedir)
   }
+
+  //
+  // Allows us to get the current test name.
+  //
+
+  val _testName = new ThreadLocal[String]();
+
+  def testName = _testName.get
+
+  protected override def runTest(testName: String, reporter: Reporter, stopper: Stopper, configMap: Map[String, Any], tracker: Tracker) = {
+    _testName.set(testName)
+    try {
+      super.runTest(testName, reporter, stopper, configMap, tracker)
+    } finally {
+      _testName.remove
+    }
+  }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties Wed Jul  7 03:47:36 2010
@@ -19,13 +19,13 @@
 # The logging properties used during tests..
 #
 log4j.rootLogger=WARN, console, file
-log4j.logger.org.apache.activemq=DEBUG
+log4j.logger.org.apache.activemq=TRACE
 
 # Console will only display warnnings
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
-log4j.appender.console.threshold=DEBUG
+log4j.appender.console.threshold=TRACE
 
 # File appender will contain all info messages
 log4j.appender.file=org.apache.log4j.FileAppender

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:47:36 2010
@@ -99,7 +99,11 @@ class StompProtocolHandler extends Proto
 
   override def onTransportConnected() = {
     outboundChannel = new TransportDeliverySink(connection.transport) {
-      override def send(delivery: Delivery) = transport.oneway(delivery.message.asInstanceOf[StompFrameMessage].frame, delivery)
+      override def send(delivery: Delivery) = {
+        if( !connection.stopped ) {
+          transport.oneway(delivery.message.asInstanceOf[StompFrameMessage].frame, delivery)
+        }
+      }
     }
     connection.broker.runtime.getDefaultVirtualHost(
       queue.wrap { (host)=>
@@ -112,7 +116,6 @@ class StompProtocolHandler extends Proto
 
   override def onTransportDisconnected() = {
     if( !closed ) {
-      info("cleaning up resources")
       closed=true;
       if( producerRoute!=null ) {
         host.router.disconnect(producerRoute)
@@ -122,6 +125,7 @@ class StompProtocolHandler extends Proto
         host.router.unbind(consumer.destination, consumer::Nil)
         consumer=null
       }
+      info("stomp protocol resources released")
     }
   }
 
@@ -202,9 +206,11 @@ class StompProtocolHandler extends Proto
           connection.transport.suspendRead
           host.router.connect(destiantion, queue, producer) {
             (route) =>
-              connection.transport.resumeRead
-              producerRoute = route
-              send_via_route(producerRoute, frame)
+              if( !connection.stopped ) {
+                connection.transport.resumeRead
+                producerRoute = route
+                send_via_route(producerRoute, frame)
+              }
           }
         } else {
           // we can re-use the existing producer route

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 03:47:36 2010
@@ -67,7 +67,7 @@ object StompWireFormat extends Log {
 class StompWireFormat extends WireFormat with DispatchLogging {
 
   import StompWireFormat._
-  protected def log: Log = StompWireFormat
+  override protected def log: Log = StompWireFormat
 
   implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
   implicit def wrap(x: Byte) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 03:47:36 2010
@@ -31,6 +31,7 @@ import _root_.org.fusesource.hawtdispatc
 import org.fusesource.hawtdispatch.BaseRetained
 
 class StompBrokerPerfTest extends BaseBrokerPerfSupport {
+  println(getClass.getClassLoader.getResource("log4j.properties"));
 
     override def createProducer() =  new StompRemoteProducer()
     override def createConsumer() = new StompRemoteConsumer()

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:47:36 2010
@@ -20,6 +20,8 @@ import org.apache.activemq.transport.Tra
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.hawtdispatch.DispatchQueue;
 import org.fusesource.hawtdispatch.DispatchSource;
@@ -43,6 +45,12 @@ import static org.apache.activemq.transp
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class TcpTransport implements Transport {
+
+    static {
+        System.out.println(TcpTransport.class.getClassLoader().getResource("log4j.properties"));
+    }
+    private static final Log LOG = LogFactory.getLog(TcpTransport.class);
+
     private Map<String, Object> socketOptions;
 
     enum SocketState {
@@ -209,8 +217,9 @@ public class TcpTransport implements Tra
         });
         readSource.setCancelHandler(new Runnable() {
             public void run() {
-                readSource.release();
-                releaseResources();
+                trace("Read canceled");
+                writeSource.cancel();
+                trace("Canceling write");
             }
         });
 
@@ -228,8 +237,10 @@ public class TcpTransport implements Tra
         });
         writeSource.setCancelHandler(new Runnable() {
             public void run() {
-                writeSource.release();
-                releaseResources();
+                trace("Write canceled");
+                writeSource.cancel();
+                trace("Disposeing");
+                dispose();
             }
         });
 
@@ -241,35 +252,46 @@ public class TcpTransport implements Tra
     public void stop() throws Exception {
         stop(null);
     }
-    public void stop(Runnable onCompleted) throws Exception {
+    public void stop(final Runnable onCompleted) throws Exception {
         if (transportState != RUNNING) {
-            throw new IllegalStateException("stop can only be used from the started state");
+            throw new IllegalStateException("stop can only be used from the started state but was "+transportState);
         }
+        trace("Canceling read");
         transportState = DISPOSED;
+        writeSource.setDisposer(new Runnable(){
+            public void run() {
+                trace("running callback: "+onCompleted);
+                if( onCompleted!=null ) {
+                    onCompleted.run();
+                }
+            }
+        });
         readSource.cancel();
-        writeSource.setDisposer(onCompleted);
-        writeSource.cancel();
     }
 
-    private void releaseResources() {
-        if( writeSource.isReleased() && writeSource.isReleased() ) {
-            try {
-                channel.close();
-            } catch (IOException ignore) {
-            }
-            listener.onTransportDisconnected();
-            OneWay oneWay = outbound.poll();
-            while (oneWay != null) {
-                if (oneWay.retained != null) {
-                    oneWay.retained.release();
-                }
-                oneWay = outbound.poll();
-            }
-            setDispatchQueue(null);
-            next_outbound_buffer = null;
-            outbound_buffer = null;
-            this.wireformat = null;
+    private void dispose() {
+
+        assert dispatchQueue!=null;
+        assert Dispatch.getCurrentQueue() == dispatchQueue;
+
+        try {
+            channel.close();
+        } catch (IOException ignore) {
         }
+        listener.onTransportDisconnected();
+//        OneWay oneWay = outbound.poll();
+//        while (oneWay != null) {
+//            if (oneWay.retained != null) {
+//                oneWay.retained.release();
+//            }
+//            oneWay = outbound.poll();
+//        }
+        readSource.release();
+        writeSource.release();
+        dispatchQueue.release();
+        next_outbound_buffer = null;
+        outbound_buffer = null;
+        this.wireformat = null;
     }
 
     public void onTransportFailure(IOException error) {
@@ -323,6 +345,7 @@ public class TcpTransport implements Tra
      * @retruns true if the outbound has been drained of all objects and there are no in progress writes.
      */
     private boolean drainOutbound() {
+        assert Dispatch.getCurrentQueue() == dispatchQueue;
         try {
 
             while (socketState == CONNECTED) {
@@ -433,12 +456,29 @@ public class TcpTransport implements Tra
         return null;
     }
 
+    private boolean assertConnected() {
+        try {
+            if (socketState != CONNECTED) {
+                throw new IOException("Not connected.");
+            }
+            return true;
+        } catch (IOException e) {
+            onTransportFailure(e);
+        }
+        return false;
+    }
+
     public void suspendRead() {
-        readSource.suspend();
+        if( assertConnected() ) {
+            readSource.suspend();
+        }
     }
 
+
     public void resumeRead() {
-        readSource.resume();
+        if( assertConnected() ) {
+            readSource.resume();
+        }
     }
 
     public void reconnect(URI uri) {
@@ -490,4 +530,16 @@ public class TcpTransport implements Tra
         this.useLocalHost = useLocalHost;
     }
 
+
+    private void trace(String message) {
+        if( LOG.isTraceEnabled() ) {
+            final String label = dispatchQueue.getLabel();
+            if( label !=null ) {
+                LOG.trace(label +" | "+message);
+            } else {
+                LOG.trace(message);
+            }
+        }
+    }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/metric/MetricAggregator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/metric/MetricAggregator.java?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/metric/MetricAggregator.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/metric/MetricAggregator.java Wed Jul  7 03:47:36 2010
@@ -41,6 +41,10 @@ public class MetricAggregator extends Me
         return metrics.remove(metric);
     }
 
+    public void removeAllMetrics() {
+        metrics.clear();
+    }
+
     public Float average() {
         if (metrics.isEmpty()) {
             return null;
@@ -54,6 +58,31 @@ public class MetricAggregator extends Me
         return rc * 1.0f / count;
     }
 
+    public Float deviation() {
+        if (metrics.isEmpty()) {
+            return null;
+        }
+        long values[] = new long[metrics.size()];
+
+        long sum=0;
+        for (int i=0; i < values.length; i++) {
+            values[i] = metrics.get(i).counter();
+            sum += values[i];
+        }
+
+        double mean = (1.0 * sum) / values.length;
+        double rc = 0;
+        for (long value : values) {
+            double v = value - mean;
+            rc += (v*v);
+        }
+        return (float)Math.sqrt(rc / values.length);
+    }
+
+    public Float total(Period p) {
+        return p.rate(total());
+    }
+
     public long total() {
         long rc = 0;
         for (Metric metric : metrics) {