You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:15:33 UTC

svn commit: r961187 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/ activemq-dto/src/main/java/org/apache/acti...

Author: chirino
Date: Wed Jul  7 04:15:31 2010
New Revision: 961187

URL: http://svn.apache.org/viewvc?rev=961187&view=rev
Log:
starting to expose broker stats via the web module.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
      - copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationSummaryDTO.java
      - copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java
      - copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/LongIdDTO.java
      - copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
      - copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StringIdDTO.java
      - copied, changed from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceStatusDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerStatus.scala
    activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 04:15:31 2010
@@ -24,9 +24,11 @@ import org.fusesource.hawtdispatch.{Disp
 import org.fusesource.hawtbuf._
 import ReporterLevel._
 import AsciiBuffer._
-import org.apache.activemq.apollo.dto.{BrokerDTO}
 import collection.{JavaConversions, SortedMap}
-import JavaConversions._ 
+import JavaConversions._
+import org.apache.activemq.apollo.dto.{VirtualHostStatusDTO, ConnectorStatusDTO, BrokerStatusDTO, BrokerDTO}
+import java.util.concurrent.atomic.AtomicLong
+import org.apache.activemq.apollo.util.LongCounter
 
 /**
  * <p>
@@ -89,6 +91,8 @@ object BufferConversions {
 
 object Broker extends Log {
 
+  val broker_id_counter = new AtomicLong()
+
   val STICK_ON_THREAD_QUEUES = true
 
   /**
@@ -156,9 +160,14 @@ class Broker() extends BaseService with 
     dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
   }
 
-  def id = config.id
+  val id = broker_id_counter.incrementAndGet
+  
+  val virtual_host_id_counter = new LongCounter
+  val connector_id_counter = new LongCounter
+  val connection_id_counter = new LongCounter
+
+  override def toString() = "broker: "+id
 
-  override def toString() = "broker: "+id 
 
   /**
    * Validates and then applies the configuration.
@@ -183,7 +192,7 @@ class Broker() extends BaseService with 
       dataDirectory = new File(config.basedir)
       defaultVirtualHost = null
       for (c <- config.virtualHosts) {
-        val host = new VirtualHost(this)
+        val host = new VirtualHost(this, virtual_host_id_counter.incrementAndGet)
         host.configure(c, this)
         virtualHosts += ascii(c.id)-> host
         // first defined host is the default virtual host
@@ -192,7 +201,7 @@ class Broker() extends BaseService with 
         }
       }
       for (c <- config.connectors) {
-        val connector = new Connector(this)
+        val connector = new Connector(this, connector_id_counter.incrementAndGet)
         connector.configure(c, this)
         connectors ::= connector
       }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 04:15:31 2010
@@ -33,8 +33,6 @@ import protocol.{ProtocolFactory, Protoc
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 object Connection extends Log {
-  val id_generator = new AtomicLong()
-  def next_id = "connection:"+id_generator.incrementAndGet
 }
 
 /**
@@ -45,14 +43,11 @@ abstract class Connection() extends Defa
   override protected def log = Connection
 
   import Connection._
-  val id = next_id
-  val dispatchQueue = createQueue(id)
+  val dispatchQueue = createQueue()
   var stopped = true
   var transport:Transport = null
   var transportSink:TransportSink = null 
 
-  override def toString = id
-
   override protected def _start(onCompleted:Runnable) = {
     stopped = false
     transportSink = new TransportSink(transport)
@@ -89,11 +84,13 @@ abstract class Connection() extends Defa
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class BrokerConnection(val connector: Connector) extends Connection {
+class BrokerConnection(val connector: Connector, val id:Long) extends Connection {
 
   var protocol = "stomp"
   var protocolHandler: ProtocolHandler = null;
 
+  override def toString = "id: "+id.toString
+
   override protected  def _start(onCompleted:Runnable) = {
     connector.dispatchQueue.retain
     protocolHandler = ProtocolFactory.get(protocol).createProtocolHandler

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed Jul  7 04:15:31 2010
@@ -16,21 +16,14 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.java.io.{File}
 import _root_.org.apache.activemq.transport._
-import _root_.org.apache.activemq.Service
-import _root_.java.lang.{String}
-import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper}
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import _root_.scala.reflect.BeanProperty
-import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
-import java.util.{HashSet, LinkedList, LinkedHashMap, ArrayList}
-import org.fusesource.hawtbuf._
-import collection.JavaConversions
-import org.apache.activemq.apollo.dto.{ConnectorDTO, BrokerDTO}
-import JavaConversions._
+import org.fusesource.hawtdispatch.{Dispatch}
+import org.apache.activemq.apollo.dto.{ConnectorDTO}
 import org.apache.activemq.wireformat.WireFormatFactory
 import ReporterLevel._
+import org.apache.activemq.apollo.util.LongCounter
+import collection.mutable.HashMap
 
 /**
  * <p>
@@ -75,7 +68,7 @@ object Connector extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Connector(val broker:Broker) extends BaseService with DispatchLogging {
+class Connector(val broker:Broker, val id:Long) extends BaseService with DispatchLogging {
   import Connector._
 
   override protected def log = Connector
@@ -85,9 +78,9 @@ class Connector(val broker:Broker) exten
   var transportServer:TransportServer = _
   var wireFormatFactory:WireFormatFactory = _
 
-  val connections: HashSet[Connection] = new HashSet[Connection]
-
+  val connections = HashMap[Long, BrokerConnection]()
   override def toString = "connector: "+config.id
+  val accept_counter = new LongCounter
 
   object BrokerAcceptListener extends TransportAcceptListener {
     def onAcceptError(error: Exception): Unit = {
@@ -103,7 +96,8 @@ class Connector(val broker:Broker) exten
         transport.setWireformat(wireFormatFactory.createWireFormat)
       }
 
-      var connection = new BrokerConnection(Connector.this)
+      accept_counter.incrementAndGet
+      var connection = new BrokerConnection(Connector.this, broker.connection_id_counter.incrementAndGet)
       connection.transport = transport
 
       if( STICK_ON_THREAD_QUEUES ) {
@@ -112,7 +106,7 @@ class Connector(val broker:Broker) exten
 
       // We release when it gets removed form the connections list.
       connection.dispatchQueue.retain
-      connections.add(connection)
+      connections.put(connection.id, connection)
 
       try {
         connection.start()
@@ -155,7 +149,7 @@ class Connector(val broker:Broker) exten
   override def _stop(onCompleted:Runnable): Unit = {
     transportServer.stop(^{
       val tracker = new LoggingTracker(toString, dispatchQueue)
-      for (connection <- connections) {
+      connections.valuesIterator.foreach { connection=>
         tracker.stop(connection)
       }
       tracker.callback(onCompleted)
@@ -166,8 +160,8 @@ class Connector(val broker:Broker) exten
    * Connections callback into the connector when they are stopped so that we can
    * stop tracking them.
    */
-  def stopped(connection:Connection) = ^{
-    if( connections.remove(connection) ) {
+  def stopped(connection:BrokerConnection) = ^{
+    if( connections.remove(connection.id).isDefined ) {
       connection.dispatchQueue.release
     }
   } |>>: dispatchQueue

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:15:31 2010
@@ -60,7 +60,7 @@ object Queue extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val host: VirtualHost, val destination: Destination, val queueKey: Long = -1L) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
+class Queue(val host: VirtualHost, val destination: Destination, val id: Long) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
   override protected def log = Queue
 
   import Queue._
@@ -112,12 +112,12 @@ class Queue(val host: VirtualHost, val d
    * Subscribers that consume slower than this rate per seconds will be considered
    * slow.
    */
-  var tune_slow_subscription_rate = 1000*1024
+  var tune_slow_subscription_rate = 500*1024
 
   /**
    * The number of milliseconds between slow consumer checks.
    */
-  var tune_slow_check_interval = 200L
+  var tune_slow_check_interval = 500L
 
   /**
    * Should this queue persistently store it's entries?
@@ -158,8 +158,8 @@ class Queue(val host: VirtualHost, val d
   var flushing_size = 0
   var flushed_items = 0
 
-  private var capacity = tune_producer_buffer
-  var size = 0
+  var capacity = 0
+  var capacity_used = 0
 
   protected def _start(onCompleted: Runnable) = {
 
@@ -179,7 +179,7 @@ class Queue(val host: VirtualHost, val d
     }
 
     if( tune_persistent ) {
-      host.store.listQueueEntryRanges(queueKey, tune_flush_range_size) { ranges=>
+      host.store.listQueueEntryRanges(id, tune_flush_range_size) { ranges=>
         dispatchQueue {
           if( !ranges.isEmpty ) {
 
@@ -214,7 +214,7 @@ class Queue(val host: VirtualHost, val d
 
     var refiller: Runnable = null
 
-    def full = (size >= capacity) || !serviceState.isStarted
+    def full = (capacity_used >= capacity) || !serviceState.isStarted
 
     def offer(delivery: Delivery): Boolean = {
       if (full) {
@@ -265,7 +265,7 @@ class Queue(val host: VirtualHost, val d
 
   var check_counter = 0
   def display_stats: Unit = {
-    info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, size, capacity)
+    info("contains: %d messages worth %,.2f MB of data, producers are %s, %d/%d buffer space used.", queue_items, (queue_size.toFloat / (1024 * 1024)), {if (messages.full) "being throttled" else "not being throttled"}, capacity_used, capacity)
     info("total messages enqueued %d, dequeues %d ", enqueue_item_counter, dequeue_item_counter)
   }
 
@@ -309,9 +309,9 @@ class Queue(val host: VirtualHost, val d
 
         if( (check_counter%25)==0 ) {
           display_stats
-          if (!all_subscriptions.isEmpty) {
-            display_active_entries
-          }
+//          if (!all_subscriptions.isEmpty) {
+//            display_active_entries
+//          }
         }
 
         // target tune_min_subscription_rate / sec
@@ -327,8 +327,8 @@ class Queue(val host: VirtualHost, val d
           // Skip over new consumers...
           if( sub.advanced_size != 0 ) {
 
-            val cursor_delta = sub.advanced_size - sub.last_cursored_size
-            sub.last_cursored_size = sub.advanced_size
+            val cursor_delta = sub.advanced_size - sub.last_advanced_size
+            sub.last_advanced_size = sub.advanced_size
 
             // If the subscription is NOT slow if it's been tail parked or
             // it's been parking and cursoring through the data at the tune_slow_subscription_rate 
@@ -634,7 +634,6 @@ class QueueEntry(val queue:Queue, val se
 
   def init(delivery:Delivery):QueueEntry = {
     state = new Loaded(delivery, false)
-    queue.size += size
     this
   }
 
@@ -689,7 +688,7 @@ class QueueEntry(val queue:Queue, val se
 
   def toQueueEntryRecord = {
     val qer = new QueueEntryRecord
-    qer.queueKey = queue.queueKey
+    qer.queueKey = queue.id
     qer.queueSeq = seq
     qer.messageKey = state.messageKey
     qer.size = state.size
@@ -714,6 +713,8 @@ class QueueEntry(val queue:Queue, val se
   def as_flushed_range = state.as_flushed_range
   def as_loaded = state.as_loaded
 
+  def label = state.label
+
   def is_tail = this == queue.tail_entry
   def is_head = this == queue.head_entry
 
@@ -763,6 +764,11 @@ class QueueEntry(val queue:Queue, val se
     def count = 0
 
     /**
+     * Retuns a string label used to describe this state.
+     */
+    def label:String
+
+    /**
      * Gets the message key for the entry.
      * @returns -1 if it is not known.
      */
@@ -868,6 +874,7 @@ class QueueEntry(val queue:Queue, val se
    */
   class Head extends EntryState {
 
+    def label = "head"
     override  def toString = "head"
     override def as_head = this
 
@@ -899,6 +906,7 @@ class QueueEntry(val queue:Queue, val se
    */
   class Tail extends EntryState {
 
+    def label = "tail"
     override  def toString = "tail"
     override def as_tail:Tail = this
 
@@ -919,6 +927,7 @@ class QueueEntry(val queue:Queue, val se
     var acquired = false
     var flushing = false
 
+    def label = "loaded"
     override def toString = { "loaded:{ stored: "+stored+", flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
 
     override def count = 1
@@ -989,7 +998,7 @@ class QueueEntry(val queue:Queue, val se
       delivery.uow = null
       if( flushing ) {
         queue.flushing_size-=size
-        queue.size -= size
+        queue.capacity_used -= size
         state = new Flushed(delivery.storeKey, size)
 
         if( can_combine_with_prev ) {
@@ -1010,7 +1019,7 @@ class QueueEntry(val queue:Queue, val se
         flushing = false
         queue.flushing_size-=size
       }
-      queue.size -= size
+      queue.capacity_used -= size
       super.remove
     }
 
@@ -1111,12 +1120,14 @@ class QueueEntry(val queue:Queue, val se
 
     var loading = false
 
+
     override def count = 1
 
     override def as_flushed = this
 
     override def is_flushed_or_flushing = true
 
+    def label = "flushed"
     override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
 
     override def load() = {
@@ -1157,7 +1168,7 @@ class QueueEntry(val queue:Queue, val se
         delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key
 
-        queue.size += size
+        queue.capacity_used += size
         queue.flushed_items -= 1
         state = new Loaded(delivery, true)
       } else {
@@ -1212,12 +1223,13 @@ class QueueEntry(val queue:Queue, val se
 
     override def is_flushed_or_flushing = true
 
-    override def toString = { "flushed_group:{ loading: "+loading+", count: "+count+", size: "+size+"}" }
+    def label = "flushed_range"
+    override def toString = { "flushed_range:{ loading: "+loading+", count: "+count+", size: "+size+"}" }
 
     override def load() = {
       if( !loading ) {
         loading = true
-        queue.host.store.listQueueEntries(queue.queueKey, seq, last) { records =>
+        queue.host.store.listQueueEntries(queue.id, seq, last) { records =>
           queue.dispatchQueue {
 
             var item_count=0
@@ -1307,7 +1319,7 @@ class Subscription(queue:Queue) extends 
   var advanced_size = 0L
 
   // Vars used to detect slow consumers.
-  var last_cursored_size = 0L
+  var last_advanced_size = 0L
   var tail_parkings = 0
   var slow_intervals = 0
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 04:15:31 2010
@@ -21,9 +21,10 @@ import _root_.org.fusesource.hawtbuf._
 import _root_.org.fusesource.hawtdispatch._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
-import java.util.HashMap
 import path.PathMap
 import collection.JavaConversions
+import org.apache.activemq.apollo.util.LongCounter
+import collection.mutable.HashMap
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -62,7 +63,6 @@ class Domain {
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 object Router extends Log {
-
 }
 
 /**
@@ -80,10 +80,14 @@ object Router extends Log {
  */
 class Router(val host:VirtualHost) extends DispatchLogging {
 
+  val destination_id_counter = new LongCounter
+
   override protected def log = Router
   protected def dispatchQueue:DispatchQueue = host.dispatchQueue
 
   trait DestinationNode {
+    val destination:Destination
+    val id = destination_id_counter.incrementAndGet
     var targets = List[DeliveryConsumer]()
     var routes = List[DeliveryProducerRoute]()
 
@@ -97,7 +101,7 @@ class Router(val host:VirtualHost) exten
     }
   }
 
-  class TopicDestinationNode extends DestinationNode {
+  class TopicDestinationNode(val destination:Destination) extends DestinationNode {
     def on_bind(x:List[DeliveryConsumer]) =  {
       targets = x ::: targets
       routes.foreach({r=>
@@ -119,7 +123,7 @@ class Router(val host:VirtualHost) exten
     }
   }
 
-  class QueueDestinationNode(destination:Destination) extends DestinationNode {
+  class QueueDestinationNode(val destination:Destination) extends DestinationNode {
     var queue:Queue = null
 
     // once the queue is created.. connect it up with the producers and targets.
@@ -159,16 +163,13 @@ class Router(val host:VirtualHost) exten
   var destinations = new HashMap[Destination, DestinationNode]()
 
   private def get(destination:Destination):DestinationNode = {
-    var result = destinations.get(destination)
-    if( result ==null ) {
+    destinations.getOrElseUpdate(destination,
       if( isTopic(destination) ) {
-        result = new TopicDestinationNode
+        new TopicDestinationNode(destination)
       } else {
-        result = new QueueDestinationNode(destination)
+        new QueueDestinationNode(destination)
       }
-      destinations.put(destination, result)
-    }
-    result
+    )
   }
 
   def bind(destination:Destination, targets:List[DeliveryConsumer]) = retaining(targets) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:15:31 2010
@@ -33,6 +33,7 @@ import org.apache.activemq.apollo.store.
 import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, CassandraStoreDTO, VirtualHostDTO}
 import java.io.File
 import java.util.concurrent.TimeUnit
+import org.apache.activemq.apollo.util.LongCounter
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -84,15 +85,15 @@ object VirtualHost extends Log {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class VirtualHost(val broker: Broker) extends BaseService with DispatchLogging with LoggingReporter {
+class VirtualHost(val broker: Broker, val id:Long) extends BaseService with DispatchLogging with LoggingReporter {
   import VirtualHost._
   
   override protected def log = VirtualHost
   override val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
 
   var config:VirtualHostDTO = _
-  private val queues = new HashMap[AsciiBuffer, Queue]()
-  private val durableSubs = new HashMap[String, DurableSubscription]()
+  val queues = new HashMap[AsciiBuffer, Queue]()
+  val durableSubs = new HashMap[String, DurableSubscription]()
   val router = new Router(this)
 
   var names:List[String] = Nil;
@@ -102,8 +103,8 @@ class VirtualHost(val broker: Broker) ex
 
   var store:Store = null
   var transactionManager:TransactionManagerX = new TransactionManagerX
-
   var protocols = Map[AsciiBuffer, WireFormat]()
+  val queue_id_counter = new LongCounter
 
   override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
 
@@ -128,15 +129,27 @@ class VirtualHost(val broker: Broker) ex
     store = StoreFactory.create(config.store)
     if( store!=null ) {
       store.configure(config.store, this)
-      val task = tracker.task("store startup")
+      val storeStartupDone = tracker.task("store startup")
       store.start {
+
+        val getKeyDone = tracker.task("store get last queue key")
+        store.getLastQueueKey{ key=>
+          key match {
+            case Some(x)=>
+              queue_id_counter.set(key.get)
+            case None =>
+              warn("Could not get last queue key")
+          }
+          getKeyDone.run
+        }
+
         if( config.purgeOnStartup ) {
-          task.name = "store purge"
+          storeStartupDone.name = "store purge"
           store.purge {
-            task.run
+            storeStartupDone.run
           }
         } else {
-          task.name = "store recover queues"
+          storeStartupDone.name = "store recover queues"
           store.listQueues { queueKeys =>
             for( queueKey <- queueKeys) {
               val task = tracker.task("store load queue key: "+queueKey)
@@ -160,7 +173,7 @@ class VirtualHost(val broker: Broker) ex
                 }
               }
             }
-            task.run
+            storeStartupDone.run
           }
         }
       }
@@ -263,21 +276,23 @@ class VirtualHost(val broker: Broker) ex
     if( store!=null ) {
       val record = new QueueRecord
       record.name = name
+      record.key = queue_id_counter.incrementAndGet
+
       store.addQueue(record) { rc =>
         rc match {
-          case Some(queueKey) =>
+          case true =>
             dispatchQueue {
-              val queue = new Queue(this, dest, queueKey)
+              val queue = new Queue(this, dest, record.key)
               queue.start()
               queues.put(dest.getName, queue)
               cb(queue)
             }
-          case None => // store could not create
+          case false => // store could not create
             cb(null)
         }
       }
     } else {
-      val queue = new Queue(this, dest)
+      val queue = new Queue(this, dest, queue_id_counter.incrementAndGet)
       queue.start()
       queues.put(dest.getName, queue)
       cb(queue)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:15:31 2010
@@ -71,7 +71,6 @@ class CassandraStore extends Store with 
   /////////////////////////////////////////////////////////////////////
   val dispatchQueue = createQueue("cassandra store")
 
-  var next_queue_key = new AtomicLong(1)
   var next_msg_key = new AtomicLong(1)
 
   val client = new CassandraClient()
@@ -80,6 +79,9 @@ class CassandraStore extends Store with 
 
   def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
 
+
+  def storeType = "cassandra"
+
   def configure(config: CassandraStoreDTO, reporter: Reporter) = {
     if ( CassandraStore.validate(config, reporter) < ERROR ) {
       if( serviceState.isStarted ) {
@@ -155,18 +157,23 @@ class CassandraStore extends Store with 
   def purge(callback: =>Unit) = {
     blocking {
       client.purge
-      next_queue_key.set(1)
       next_msg_key.set(1)
       callback
     }
   }
 
-  def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
-    val key = next_queue_key.getAndIncrement
-    record.key = key
+  /**
+   * Ges the next queue key identifier.
+   */
+  def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
+    // TODO:
+    callback( Some(1L) )
+  }
+
+  def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
     blocking {
       client.addQueue(record)
-      callback(Some(key))
+      callback(true)
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java Wed Jul  7 04:15:31 2010
@@ -30,7 +30,7 @@ import javax.xml.bind.annotation.XmlRoot
  */
 @XmlRootElement(name="broker")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class BrokerDTO extends ServiceDTO {
+public class BrokerDTO extends ServiceDTO<String> {
 
     /**
      * Used to track config revisions.

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java Wed Jul  7 04:15:31 2010
@@ -16,10 +16,9 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * <p>
@@ -32,17 +31,27 @@ import javax.xml.bind.annotation.XmlRoot
 public class BrokerStatusDTO extends ServiceStatusDTO {
 
     /**
-     * The current time on the broker machine.
+     * The current time on the broker machine.  In milliseconds since the epoch.
      */
 	@XmlAttribute(name="current-time")
-	public String currentTime;
+	public long currentTime;
 
     /**
-     * Since when has the broker in in this state?
+     * Ids of all the virtual hosts running on the broker
      */
-	@XmlAttribute(name="in-state-since")
-	public String inStateSince;
+    @XmlElement(name="virtual-host")
+    public List<Long> virtualHosts = new ArrayList<Long>();
 
+    /**
+     * Ids of all the connections running on the broker
+     */
+    @XmlElement(name="connectors")
+    public List<Long> connectors = new ArrayList<Long>();
 
+    /**
+     * The current running configuration of the object
+     */
+    @XmlElement(name="config")
+    public BrokerDTO config = null;
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerSummaryDTO.java Wed Jul  7 04:15:31 2010
@@ -29,7 +29,7 @@ import javax.xml.bind.annotation.XmlRoot
  */
 @XmlRootElement(name="broker-summary")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class BrokerSummaryDTO extends IdDTO {
+public class BrokerSummaryDTO extends StringIdDTO {
 
     /**
      * The latest revision of the broker config.

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorDTO.java Wed Jul  7 04:15:31 2010
@@ -26,7 +26,7 @@ import javax.xml.bind.annotation.*;
  */
 @XmlRootElement(name = "connector")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ConnectorDTO extends ServiceDTO {
+public class ConnectorDTO extends ServiceDTO<String> {
 
     /**
      * The transport uri which it will accept connections on.

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java Wed Jul  7 04:15:31 2010
@@ -16,10 +16,9 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * <p>
@@ -29,25 +28,24 @@ import javax.xml.bind.annotation.XmlRoot
  */
 @XmlRootElement(name="connector-status")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ConnectorStatusDTO {
+public class ConnectorStatusDTO extends ServiceStatusDTO {
 
     /**
-     * A unique id of the connector.
+     * The number of connections that this connector has accepted.
      */
-	@XmlAttribute(name="id")
-	public String id;
+	@XmlAttribute(name="accepted")
+	public Long accepted;
 
     /**
-     * The state of the object.
+     * Ids of all open connections that the connector is managing.
      */
-	@XmlAttribute(name="state")
-	public String state;
+    @XmlElement(name="connection")
+    public List<Long> connections = new ArrayList<Long>();
 
     /**
-     * The number of connections that this connector has accepted.
+     * The current running configuration of the object
      */
-	@XmlAttribute(name="accepted")
-	public Long accepted;
+    @XmlElement(name="config")
+    public ConnectorDTO config = null;
 
-    
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java Wed Jul  7 04:15:31 2010
@@ -17,21 +17,26 @@
 package org.apache.activemq.apollo.dto;
 
 import javax.xml.bind.annotation.*;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
+ * <p>
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="id")
+@XmlRootElement(name="destination-summary")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class IdDTO {
+public class DestinationStatusDTO extends IdDTO<Long> {
 
     /**
-     * A unique id of the object within it's container
+     * The destination name
      */
-	@XmlAttribute(name="id")
-	public String id;
-
+    @XmlElement(name="name")
+    public String name;
 
+    /**
+     * The routing domain
+     */
+    @XmlElement(name="domain")
+    public String domain;
 }
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationSummaryDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationSummaryDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationSummaryDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationSummaryDTO.java Wed Jul  7 04:15:31 2010
@@ -21,17 +21,25 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
+ * <p>
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="id")
+@XmlRootElement(name="virtual-host-status")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class IdDTO {
+public class DestinationSummaryDTO extends ServiceStatusDTO {
 
     /**
-     * A unique id of the object within it's container
+     * The type of store the virtual host is using.
      */
-	@XmlAttribute(name="id")
-	public String id;
+    @XmlAttribute(name="store-type")
+    public String storeType;
 
+    /**
+     * Ids of all the destinations running on the broker
+     */
+    @XmlElement(name="destination")
+    public List<Long> destinations = new ArrayList<Long>();
 
 }
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java Wed Jul  7 04:15:31 2010
@@ -27,22 +27,42 @@ import javax.xml.bind.annotation.XmlRoot
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="broker-status")
+@XmlRootElement(name="destination-status")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class BrokerStatusDTO extends ServiceStatusDTO {
+public class EntryStatusDTO extends IdDTO<Long> {
 
     /**
-     * The current time on the broker machine.
+     * A unique id of the object within it's container
      */
-	@XmlAttribute(name="current-time")
-	public String currentTime;
+	@XmlAttribute(name="enqueue-item-counter")
+	public long id;
 
-    /**
-     * Since when has the broker in in this state?
-     */
-	@XmlAttribute(name="in-state-since")
-	public String inStateSince;
+    @XmlAttribute(name="enqueue-item-counter")
+    public long enqueueItemCounter;
+    @XmlAttribute(name="dequeue-item-counter")
+    public long dequeueItemCounter;
+    @XmlAttribute(name="enqueue-size-counter")
+    public long enqueueSizeCounter;
+    @XmlAttribute(name="dequeue-size-counter")
+    public long dequeueSizeCounter;
+    @XmlAttribute(name="nack-item-counter")
+    public long nackItemCounter;
+    @XmlAttribute(name="nack-size-counter")
+    public long nackSizeCounter;
+
+    @XmlAttribute(name="queue-size")
+    public long queueSize;
+    @XmlAttribute(name="queue-items")
+    public long queueItems;
 
+    @XmlAttribute(name="loading-size")
+    public int loadingSize;
+    @XmlAttribute(name="flushing-size")
+    public int flushingSize;
+    @XmlAttribute(name="flushed-items")
+    public int flushedItems;
 
+    @XmlAttribute(name="capacity")
+    public int capacity;
 
-}
+}
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/LongIdDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/LongIdDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/LongIdDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/LongIdDTO.java Wed Jul  7 04:15:31 2010
@@ -26,13 +26,13 @@ import javax.xml.bind.annotation.XmlRoot
  */
 @XmlRootElement(name="id")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ServiceDTO extends IdDTO {
+public class LongIdDTO {
 
     /**
-     * Should this service be running?
+     * A unique id of the object within it's container
      */
-    @XmlAttribute(name="enabled")
-    public boolean enabled = true;
+	@XmlAttribute(name="id")
+	public long id;
 
 
 }
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Wed Jul  7 04:15:31 2010
@@ -18,8 +18,10 @@ package org.apache.activemq.apollo.dto;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * <p>
@@ -27,27 +29,25 @@ import javax.xml.bind.annotation.XmlRoot
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="connector-status")
+@XmlRootElement(name="destination-status")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ConnectorStatusDTO {
+public class QueueStatusDTO extends DestinationSummaryDTO {
 
     /**
-     * A unique id of the connector.
+     * Ids of all connections that are producing to the destination
      */
-	@XmlAttribute(name="id")
-	public String id;
+    @XmlElement(name="producer")
+    public List<Long> producers = new ArrayList<Long>();
 
     /**
-     * The state of the object.
+     * Ids of all connections that are consuming from the destination
      */
-	@XmlAttribute(name="state")
-	public String state;
+    @XmlElement(name="consumer")
+    public List<Long> consumers = new ArrayList<Long>();
 
     /**
-     * The number of connections that this connector has accepted.
+     * Ids of all queues that are associated with the destination
      */
-	@XmlAttribute(name="accepted")
-	public Long accepted;
-
-    
-}
+    @XmlElement(name="queue")
+    public List<Long> queues = new ArrayList<Long>();
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceDTO.java Wed Jul  7 04:15:31 2010
@@ -26,7 +26,7 @@ import javax.xml.bind.annotation.XmlRoot
  */
 @XmlRootElement(name="id")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ServiceDTO extends IdDTO {
+public class ServiceDTO<ID> extends StringIdDTO {
 
     /**
      * Should this service be running?

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceStatusDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ServiceStatusDTO.java Wed Jul  7 04:15:31 2010
@@ -26,7 +26,7 @@ import javax.xml.bind.annotation.XmlRoot
  */
 @XmlRootElement(name="service-status")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ServiceStatusDTO extends IdDTO {
+public class ServiceStatusDTO extends LongIdDTO {
 
     /**
      * The state of the service.
@@ -34,5 +34,10 @@ public class ServiceStatusDTO extends Id
 	@XmlAttribute(name="state")
 	public String state;
 
+    /**
+     * Since when has the broker in in this state?  In milliseconds since the epoch. 
+     */
+	@XmlAttribute(name="state-since")
+	public long stateSince;
 
 }
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StringIdDTO.java (from r961186, activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StringIdDTO.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StringIdDTO.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java&r1=961186&r2=961187&rev=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/IdDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/StringIdDTO.java Wed Jul  7 04:15:31 2010
@@ -25,7 +25,7 @@ import java.util.List;
  */
 @XmlRootElement(name="id")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class IdDTO {
+public class StringIdDTO {
 
     /**
      * A unique id of the object within it's container

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Wed Jul  7 04:15:31 2010
@@ -25,7 +25,7 @@ import javax.xml.bind.annotation.*;
  */
 @XmlRootElement(name = "virtual-host")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class VirtualHostDTO extends ServiceDTO {
+public class VirtualHostDTO extends ServiceDTO<String> {
 
     @XmlElement(name="host-name", required=true)
     public ArrayList<String> hostNames = new ArrayList<String>();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostStatusDTO.java Wed Jul  7 04:15:31 2010
@@ -16,10 +16,9 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * <p>
@@ -30,5 +29,24 @@ import javax.xml.bind.annotation.XmlRoot
 @XmlRootElement(name="virtual-host-status")
 @XmlAccessorType(XmlAccessType.FIELD)
 public class VirtualHostStatusDTO extends ServiceStatusDTO {
-        
+
+    /**
+     * The type of store the virtual host is using.
+     */
+    @XmlAttribute(name="store-type")
+    public String storeType;
+
+    /**
+     * Ids of all the destinations running on the broker
+     */
+    @XmlElement(name="destination")
+    public List<DestinationSummaryDTO> destinations = new ArrayList<DestinationSummaryDTO>();
+
+
+    /**
+     * The current running configuration of the object
+     */
+    @XmlElement(name="config")
+    public VirtualHostDTO config = null;
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:15:31 2010
@@ -79,6 +79,8 @@ class HawtDBStore extends Store with Bas
 
   def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[HawtDBStoreDTO], reporter)
 
+  def storeType = "hawtdb"
+
   def configure(config: HawtDBStoreDTO, reporter: Reporter) = {
     if ( HawtDBStore.validate(config, reporter) < ERROR ) {
       if( serviceState.isStarted ) {
@@ -139,10 +141,15 @@ class HawtDBStore extends Store with Bas
   }
 
 
-  def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
-    val key = next_queue_key.getAndIncrement
-    record.key = key
-    client.addQueue(record, ^{ callback(Some(key)) })
+  /**
+   * Ges the last queue key identifier stored.
+   */
+  def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
+    callback(Some(client.rootBuffer.getLastQueueKey.longValue))
+  }
+
+  def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
+    client.addQueue(record, ^{ callback(true) })
   }
 
   def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:15:31 2010
@@ -31,6 +31,8 @@ import org.apache.activemq.apollo.Servic
  */
 trait Store extends ServiceTrait {
 
+  def storeType:String
+
   /**
    * Creates a store uow which is used to perform persistent
    * operations as unit of work.
@@ -50,12 +52,17 @@ trait Store extends ServiceTrait {
   def purge(callback: =>Unit):Unit
 
   /**
+   * Ges the last queue key identifier stored.
+   */
+  def getLastQueueKey(callback:(Option[Long])=>Unit):Unit
+
+  /**
    * Adds a queue.
    * 
    * This method auto generates and assigns the key field of the queue record and
-   * returns it via the callback.
+   * returns true if it succeeded.
    */
-  def addQueue(record:QueueRecord)(callback:(Option[Long])=>Unit):Unit
+  def addQueue(record:QueueRecord)(callback:(Boolean)=>Unit):Unit
 
   /**
    * Removes a queue. Success is reported via the callback.

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/StoreFactory.scala Wed Jul  7 04:15:31 2010
@@ -66,7 +66,7 @@ object StoreFactory {
         return rc
       }
     }
-    throw new IllegalArgumentException("Uknonwn store configuration type: "+config.getClass)
+    throw new IllegalArgumentException("Uknonwn store type: "+config.getClass)
   }
 
 
@@ -81,7 +81,7 @@ object StoreFactory {
         }
       }
     }
-    reporter.report(ERROR, "Uknonwn store configuration type: "+config.getClass)
+    reporter.report(ERROR, "Uknonwn store type: "+config.getClass)
     ERROR
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala Wed Jul  7 04:15:31 2010
@@ -25,6 +25,7 @@ import org.apache.activemq.apollo.store.
 import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll}
 import collection.mutable.ListBuffer
 import java.util.concurrent.atomic.{AtomicLong, AtomicInteger, AtomicBoolean}
+import org.apache.activemq.apollo.util.LongCounter
 
 /**
  * <p>Implements generic testing of Store implementations.</p>
@@ -83,12 +84,15 @@ abstract class StoreBenchmarkSupport ext
     }
   }
 
+  val queue_key_counter = new LongCounter
+
   def addQueue(name:String):Long = {
     var queueA = new QueueRecord
+    queueA.key = queue_key_counter.incrementAndGet
     queueA.name = ascii(name)
-    val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
-    expect(true)(rc.isDefined)
-    rc.get
+    val rc:Boolean = CB( cb=> store.addQueue(queueA)(cb) )
+    expect(true)(rc)
+    queueA.key
   }
 
   def addMessage(batch:StoreUOW, content:String):Long = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala Wed Jul  7 04:15:31 2010
@@ -24,6 +24,7 @@ import java.util.concurrent.{TimeUnit, C
 import org.apache.activemq.apollo.store.{QueueEntryRecord, QueueStatus, QueueRecord, MessageRecord}
 import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll}
 import collection.mutable.ListBuffer
+import org.apache.activemq.apollo.util.LongCounter
 
 /**
  * <p>Implements generic testing of Store implementations.</p>
@@ -82,12 +83,15 @@ abstract class StoreFunSuiteSupport exte
     }
   }
 
+  val queue_key_counter = new LongCounter
+
   def addQueue(name:String):Long = {
     var queueA = new QueueRecord
+    queueA.key = queue_key_counter.incrementAndGet
     queueA.name = ascii(name)
-    val rc:Option[Long] = CB( cb=> store.addQueue(queueA)(cb) )
-    expect(true)(rc.isDefined)
-    rc.get
+    val rc:Boolean = CB( cb=> store.addQueue(queueA)(cb) )
+    expect(true)(rc)
+    queueA.key
   }
 
   def addMessage(batch:StoreUOW, content:String):Long = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul  7 04:15:31 2010
@@ -34,6 +34,9 @@ trait BaseService extends Service with L
   override protected def log:Log = BaseService
 
   sealed class State {
+
+    val since = System.currentTimeMillis
+
     override def toString = getClass.getSimpleName
     def isCreated = false
     def isStarting = false
@@ -49,12 +52,12 @@ trait BaseService extends Service with L
     def done = { callbacks.foreach(_.run); callbacks=Nil }
   }
 
-  protected object CREATED extends State { override def isCreated = true  }
-  protected class  STARTING extends State with CallbackSupport { override def isStarting = true  }
-  protected object FAILED extends State { override def isFailed = true  }
-  protected object STARTED extends State { override def isStarted = true  }
-  protected class  STOPPING extends State with CallbackSupport { override def isStopping = true  }
-  protected object STOPPED extends State { override def isStopped = true  }
+  protected class CREATED extends State { override def isCreated = true  }
+  protected class STARTING extends State with CallbackSupport { override def isStarting = true  }
+  protected class FAILED extends State { override def isFailed = true  }
+  protected class STARTED extends State { override def isStarted = true  }
+  protected class STOPPING extends State with CallbackSupport { override def isStopping = true  }
+  protected class STOPPED extends State { override def isStopped = true  }
 
   protected val dispatchQueue:DispatchQueue
 
@@ -62,7 +65,8 @@ trait BaseService extends Service with L
   final def stop() = stop(null)
 
   @volatile
-  protected var _serviceState:State = CREATED
+  protected var _serviceState:State = new CREATED
+
   def serviceState = _serviceState
 
   @volatile
@@ -76,7 +80,7 @@ trait BaseService extends Service with L
       _serviceState = state
       try {
         _start(^ {
-          _serviceState = STARTED
+          _serviceState = new STARTED
           state.done
         })
       }
@@ -84,7 +88,7 @@ trait BaseService extends Service with L
         case e:Exception =>
           error(e, "Start failed due to %s", e)
           _serviceFailure = e
-          _serviceState = FAILED
+          _serviceState = new FAILED
           state.done
       }
     }
@@ -94,13 +98,13 @@ trait BaseService extends Service with L
       }
     }
     _serviceState match {
-      case CREATED =>
+      case state:CREATED =>
         do_start
-      case STOPPED =>
+      case state:STOPPED =>
         do_start
       case state:STARTING =>
         state << onCompleted
-      case STARTED =>
+      case state:STARTED =>
         done
       case state =>
         done
@@ -116,13 +120,13 @@ trait BaseService extends Service with L
         }
       }
       _serviceState match {
-        case STARTED =>
+        case state:STARTED =>
           val state = new STOPPING
           state << onCompleted
           _serviceState = state
           try {
             _stop(^ {
-              _serviceState = STOPPED
+              _serviceState = new STOPPED
               state.done
             })
           }
@@ -130,12 +134,12 @@ trait BaseService extends Service with L
             case e:Exception =>
               error(e, "Stop failed due to: %s", e)
               _serviceFailure = e
-              _serviceState = FAILED
+              _serviceState = new FAILED
               state.done
           }
         case state:STOPPING =>
           state << onCompleted
-        case STOPPED =>
+        case state:STOPPED =>
           done
         case state =>
           done

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala Wed Jul  7 04:15:31 2010
@@ -32,6 +32,7 @@ class LongCounter(private var value:Long
   def clear() = value=0
 
   def get() = value
+  def set(value:Long) = this.value = value 
 
   def incrementAndGet() = addAndGet(1)
   def decrementAndGet() = addAndGet(-1)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/BrokerRegistry.scala Wed Jul  7 04:15:31 2010
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq.apollo
 
-import java.util.HashMap
 import org.apache.activemq.apollo.broker.Broker
+import java.util.concurrent.ConcurrentHashMap
 
 /**
  * <p>
@@ -29,17 +29,11 @@ object BrokerRegistry {
 
   var configStore:ConfigStore = _
 
-  private val _brokers = new HashMap[String, Broker]()
+  val brokers = new ConcurrentHashMap[String, Broker]()
 
-  def get(id:String) = _brokers.synchronized {
-    _brokers.get(id)
-  }
-
-  def add(broker:Broker) = _brokers.synchronized {
-    _brokers.put(broker.config.id, broker)
-  }
-
-  def remove(id:String) = _brokers.synchronized {
-    _brokers.remove(id)
-  }
+  def get(id:String) = brokers.get(id)
+
+  def add(broker:Broker) = brokers.put(broker.config.id, broker)
+
+  def remove(id:String) = brokers.remove(id)
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/ServletContextListener.scala Wed Jul  7 04:15:31 2010
@@ -34,8 +34,6 @@ import org.fusesource.hawtdispatch.Scala
  */
 class ServletContextListener extends GuiceServletContextListener {
 
-  var broker:Broker = null
-
   override def contextInitialized(servletContextEvent: ServletContextEvent) = {
 
     try {
@@ -43,19 +41,25 @@ class ServletContextListener extends Gui
 
       // Brokers startup async.
       BrokerRegistry.configStore.foreachBroker(true) { config=>
+
+        println("Config store contained broker: "+config.id);
+
         // Only start the broker up if it's enabled..
         if( config.enabled ) {
+
+          println("starting broker: "+config.id);
           val broker = new Broker()
           broker.config = config
           BrokerRegistry.add(broker)
           broker.start()
+
         }
+
       }
 
     }
     catch {
       case e:Exception =>
-        broker = null
         e.printStackTrace
     }
 
@@ -64,7 +68,8 @@ class ServletContextListener extends Gui
 
   override def contextDestroyed(servletContextEvent: ServletContextEvent) = {
     super.contextDestroyed(servletContextEvent);
-    val tracker = new LoggingTracker("broker shutdown")
+    
+    val tracker = new LoggingTracker("webapp shutdown")
     BrokerRegistry.configStore.foreachBroker(false) { config=>
       // remove started brokers what we configured..
       val broker = BrokerRegistry.remove(config.id);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerStatus.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerStatus.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerStatus.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerStatus.scala Wed Jul  7 04:15:31 2010
@@ -16,10 +16,17 @@
  */
 package org.apache.activemq.apollo.web.resources;
 
-import java.lang.String
 import javax.ws.rs._
-import reflect.{BeanProperty}
-import org.apache.activemq.apollo.dto.{ConnectionStatusDTO, ConnectorStatusDTO, VirtualHostStatusDTO, BrokerStatusDTO}
+import core.Response
+import org.apache.activemq.apollo.BrokerRegistry
+import Response.Status._
+import java.util.List
+import org.apache.activemq.apollo.dto._
+import java.{lang => jl}
+import collection.JavaConversions
+import org.fusesource.hawtdispatch.{ScalaDispatch, Future}
+import ScalaDispatch._
+import org.apache.activemq.apollo.broker._
 
 /**
  * <p>
@@ -28,55 +35,188 @@ import org.apache.activemq.apollo.dto.{C
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class BrokerStatus(parent:Broker, @BeanProperty id:String) extends Resource {
+case class BrokerStatus(parent:Broker) extends Resource {
+
+  val broker:org.apache.activemq.apollo.broker.Broker = BrokerRegistry.get(parent.id)
+  if( broker == null ) {
+    println("not in regisitry: "+BrokerRegistry.brokers)
+    result(NOT_FOUND)
+  }
+
   @GET
   def get() = {
-    val rc = new BrokerStatusDTO
-    rc.id = id
-    rc
+    println("get hit")
+    Future[BrokerStatusDTO] { cb=>
+      broker.dispatchQueue {
+        println("building result...")
+        val result = new BrokerStatusDTO
+
+        result.id = broker.id
+        result.currentTime = System.currentTimeMillis
+        result.state = broker.serviceState.toString
+        result.stateSince - broker.serviceState.since
+        result.config = broker.config
+
+        broker.connectors.foreach{ c=>
+          result.connectors.add(c.id)
+        }
+
+        broker.virtualHosts.values.foreach{ host=>
+          result.virtualHosts.add( host.id )
+        }
+
+
+        cb(result)
+      }
+    }
   }
 
+
   @Path("virtual-hosts")
-  def virtualHosts :Array[VirtualHostStatus] = null
-  @Path("virtual-hosts/{id}")
-  def virtualHost(@PathParam("id") id : String):VirtualHostStatus = null
+  def virtualHosts :Array[jl.Long] = {
+    val list: List[jl.Long] = get.virtualHosts
+    list.toArray(new Array[jl.Long](list.size))
+  }
 
-  @Path("connectors")
-  def connectors :Array[ConnectorStatus] = null
-  @Path("connectors/{id}")
-  def connector(@PathParam("id") id : String):ConnectorStatus = null
+  private def with_virtual_host[T](id:Long)(func: (VirtualHost, Option[T]=>Unit)=>Unit):T = {
+    Future[Option[T]] { cb=>
+      broker.virtualHosts.valuesIterator.find( _.id == id) match {
+        case Some(virtualHost)=>
+          virtualHost.dispatchQueue {
+            func(virtualHost, cb)
+          }
+        case None=> cb(None)
+      }
+    }.getOrElse(result(NOT_FOUND))
+  }
 
-  @Path("connections")
-  def connections :Array[ConnectionStatus] = null
-  @Path("connections/{id}")
-  def connection(@PathParam("id") id : String):ConnectionStatus = null
-}
+  @Path("virtual-hosts/{id}")
+  def virtualHost(@PathParam("id") id : Long):VirtualHostStatusDTO = {
+    with_virtual_host(id) { case (virtualHost,cb) =>
+      val result = new VirtualHostStatusDTO
+      result.id = virtualHost.id
+      result.state = virtualHost.serviceState.toString
+      result.stateSince = virtualHost.serviceState.since
+      result.config = virtualHost.config
+
+      if( virtualHost.store != null ) {
+        result.storeType = virtualHost.store.storeType
+      }
+      virtualHost.router.destinations.valuesIterator.foreach { node=>
+        val summary = new DestinationSummaryDTO
+        summary.id = node.id
+        summary.name = node.destination.getName.toString
+        summary.domain = node.destination.getDomain.toString
+        result.destinations.add(summary)
+      }
+      cb(Some(result))
+    }
+  }
 
-case class VirtualHostStatus(parent:BrokerStatus, @BeanProperty id: String) extends Resource {
-  @GET
-  def get() = {
-    val rc = new VirtualHostStatusDTO
-    rc.id = id
-    rc
+  @Path("virtual-hosts/{id}/destinations/{dest}")
+  def destination(@PathParam("id") id : Long, @PathParam("dest") dest : Long):DestinationStatusDTO = {
+    with_virtual_host(id) { case (virtualHost,cb) =>
+      cb(virtualHost.router.destinations.valuesIterator.find { _.id == dest } map { node=>
+        val result = new DestinationStatusDTO
+        result.id = node.id
+        result.name = node.destination.getName.toString
+        result.domain = node.destination.getDomain.toString
+
+        node match {
+          case qdn:virtualHost.router.QueueDestinationNode =>
+            result.queues.add(qdn.queue.id)
+          case _ =>
+        }
+        result
+      })
+    }
   }
-}
 
-case class ConnectorStatus(parent:BrokerStatus, @BeanProperty id: String) extends Resource {
+  @Path("virtual-hosts/{id}/queues/{queue}")
+  def queue(@PathParam("id") id : Long, @PathParam("queue") qid : Long):QueueStatusDTO = {
+    with_virtual_host(id) { case (virtualHost,cb) =>
+      import JavaConversions._
+      virtualHost.queues.valuesIterator.find { _.id == qid } match {
+        case Some(q:Queue)=>
+          q.dispatchQueue {
+
+            val result = new QueueStatusDTO
+            result.id = q.id
+            result.capacity = q.capacity
+
+            result.enqueueItemCounter = q.enqueue_item_counter
+            result.dequeueItemCounter = q.dequeue_item_counter
+            result.enqueueSizeCounter = q.enqueue_size_counter
+            result.dequeueSizeCounter = q.dequeue_size_counter
+            result.nackItemCounter = q.nack_item_counter
+            result.nackSizeCounter = q.nack_size_counter
+
+            result.queueSize = q.queue_size
+            result.queueItems = q.queue_items
+
+            result.loadingSize = q.loading_size
+            result.flushingSize = q.flushing_size
+            result.flushedItems = q.flushed_items
+
+
+            var cur = q.head_entry
+            while( cur!=null ) {
+
+              val e = new EntryStatusDTO
+              e.seq = cur.seq
+              e.count = cur.count
+              e.size = cur.size
+              e.consumers = cur.parked.size
+              e.prefetched = cur.prefetched
+              e.state = cur.label
+
+              result.entries.add(e)
+
+              cur = if( cur == q.tail_entry ) {
+                null
+              } else {
+                cur.nextOrTail
+              }
+            }
+
+            cb(Some(result))
+          }
+        case None=>
+          cb(None)
+      }
+    }
+  }
 
-  @GET
-  def get() = {
-    val rc = new ConnectorStatusDTO
-    rc.id = id
-    rc
+
+  @Path("connectors")
+  def connectors :Array[jl.Long] = {
+    val list: List[jl.Long] = get.connectors
+    list.toArray(new Array[jl.Long](list.size))
   }
-}
 
-case class ConnectionStatus(parent:BrokerStatus, @BeanProperty id:String) extends Resource {
+  @Path("connectors/{id}")
+  def connector(@PathParam("id") id : Long):ConnectorStatusDTO = {
+
+    Future[Option[ConnectorStatusDTO]] { cb=>
+      broker.connectors.find(_.id == id) match {
+        case Some(connector)=>
+          connector.dispatchQueue {
+            val result = new ConnectorStatusDTO
+            result.id = connector.id
+            result.state = connector.serviceState.toString
+            result.stateSince = connector.serviceState.since
+            result.config = connector.config
+
+            result.accepted = connector.accept_counter.get
+            connector.connections.keysIterator.foreach { id=>
+              result.connections.add(id)
+            }
+            cb(Some(result))
+          }
+        case None=> cb(None)
+      }
+    }.getOrElse(result(NOT_FOUND))
 
-  @GET
-  def get() = {
-    val rc = new ConnectionStatusDTO
-    rc.id = id
-    rc
   }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala?rev=961187&r1=961186&r2=961187&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/web/resources/Root.scala Wed Jul  7 04:15:31 2010
@@ -123,6 +123,6 @@ case class Broker(parent:Root, @BeanProp
   }
 
   @Path("status")
-  def status = BrokerStatus(this, id)
+  def status = BrokerStatus(this)
 }