You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:08:25 UTC

svn commit: r961134 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/ activemq-hawtdb/src/main/scala/org/apache/...

Author: chirino
Date: Wed Jul  7 04:08:24 2010
New Revision: 961134

URL: http://svn.apache.org/viewvc?rev=961134&view=rev
Log:
teaking stores to get better perf when there is not attached consumer

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/MetricProducer.scala
      - copied, changed from r961133, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:08:24 2010
@@ -91,7 +91,7 @@ class Queue(val host: VirtualHost, val d
   val session_manager = new SinkMux[Delivery](messages, dispatchQueue, Delivery)
 
   // sequence numbers.. used to track what's in the store.
-  var message_seq_counter = 0L
+  var message_seq_counter = 1L
 
   val headEntry = new QueueEntry(this).tombstone
   var tailEntry = new QueueEntry(this)
@@ -121,16 +121,18 @@ class Queue(val host: VirtualHost, val d
 
   def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
     this.storeId = storeId
-    records.foreach { qer =>
-      val entry = new QueueEntry(Queue.this).flushed(qer)
-      entries.addLast(entry)
-    }
-    if( !entries.isEmpty ) {
-      message_seq_counter = entries.getTail.seq
-    }
-    counter = records.size
-    enqueue_counter += records.size
-    debug("restored: "+records.size )
+    if( !records.isEmpty ) {
+      records.foreach { qer =>
+        val entry = new QueueEntry(Queue.this).flushed(qer)
+        entries.addLast(entry)
+      }
+
+      message_seq_counter = records.last.queueSeq+1
+
+      counter = records.size
+      enqueue_counter += records.size
+      debug("restored: "+records.size )
+    }
   } >>: dispatchQueue
 
   object messages extends Sink[Delivery] {
@@ -149,15 +151,9 @@ class Queue(val host: VirtualHost, val d
 
         val entry = tailEntry
         tailEntry = new QueueEntry(Queue.this)
-        entry.created(next_message_seq, delivery)
-
-        if( delivery.ack!=null ) {
-          delivery.ack(delivery.storeBatch)
-        }
-        if (delivery.storeKey != -1) {
-          delivery.storeBatch.enqueue(entry.createQueueEntryRecord)
-          delivery.storeBatch.release
-        }
+        val queueDelivery = delivery.copy
+        entry.created(next_message_seq, queueDelivery)
+        queueDelivery.storeBatch = delivery.storeBatch
 
         size += entry.size
         entries.addLast(entry)
@@ -165,6 +161,11 @@ class Queue(val host: VirtualHost, val d
         enqueue_counter += 1
         enqueue_size += entry.size
 
+        // Do we need to do a persistent enqueue???
+        if (queueDelivery.storeBatch != null) {
+          queueDelivery.storeBatch.enqueue(entry.createQueueEntryRecord)
+        }
+
         var swap_check = false
         if( !entry.hasSubs ) {
           // we flush the entry out right away if it looks
@@ -192,6 +193,12 @@ class Queue(val host: VirtualHost, val d
           })
         }
 
+        // release the store batch...
+        if (queueDelivery.storeBatch != null) {
+          queueDelivery.storeBatch.release
+          queueDelivery.storeBatch = null
+        }
+
         true
       }
     }
@@ -267,19 +274,9 @@ class Queue(val host: VirtualHost, val d
         false
       } else {
 
-        // Called from the producer thread before the delivery is
-        // processed by the queue's thread.. We don't
-        // yet know the order of the delivery in the queue.
-        if (delivery.storeKey != -1) {
-          // If the message has a store id, then this delivery will
-          // need a tx to track the store changes.
-          if( delivery.storeBatch == null ) {
-            delivery.storeBatch = host.store.createStoreBatch
-          } else {
-            delivery.storeBatch.retain
-          }
+        if( delivery.storeBatch!=null ) {
+          delivery.storeBatch.retain
         }
-
         val rc = session.offer(delivery)
         assert(rc, "session should accept since it was not full")
         true
@@ -440,7 +437,8 @@ class Queue(val host: VirtualHost, val d
   }
 
   def drain_store_flushes() = {
-    store_flush_source.getData.foreach { entry =>
+    val data = store_flush_source.getData
+    data.foreach { entry =>
       flushingSize -= entry.size
 
       // by the time we get called back, subs my be interested in the entry
@@ -708,9 +706,6 @@ class QueueEntry(val queue:Queue) extend
         delivery.storeKey = tx.store(delivery.createMessageRecord)
         tx.enqueue(createQueueEntryRecord)
         tx.release
-        true
-      } else {
-        false
       }
     }
 
@@ -719,10 +714,18 @@ class QueueEntry(val queue:Queue) extend
         0
       } else {
         flushing=true
-        store
-        queue.host.store.flushMessage(ref) {
-          queue.store_flush_source.merge(QueueEntry.this)
+
+        if( delivery.storeBatch!=null ) {
+          delivery.storeBatch.eagerFlush(^{
+            queue.store_flush_source.merge(QueueEntry.this)
+          })
+        } else {
+          store
+          queue.host.store.flushMessage(ref) {
+            queue.store_flush_source.merge(QueueEntry.this)
+          }
         }
+
         size
       }
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 04:08:24 2010
@@ -182,7 +182,7 @@ class Router(val host:VirtualHost) exten
     } >>: dispatchQueue
 
   def connect(destination:Destination, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
-    val route = new DeliveryProducerRoute(destination, producer) {
+    val route = new DeliveryProducerRoute(this, destination, producer) {
       override def on_connected = {
         completed(this);
       }
@@ -228,7 +228,7 @@ trait Route extends Retained {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class DeliveryProducerRoute(val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
+class DeliveryProducerRoute(val router:Router, val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
 
   override protected def log = Router
   override def dispatchQueue = producer.dispatchQueue
@@ -294,22 +294,43 @@ class DeliveryProducerRoute(val destinat
 
   def full = overflow!=null
 
-  def offer(value:Delivery) = {
+  def offer(delivery:Delivery) = {
     if( full ) {
       false
     } else {
+      if( delivery.message.persistent && router.host.store!=null ) {
+        delivery.storeBatch = router.host.store.createStoreBatch
+        delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
+      }
+
       targets.foreach { target=>
-        if( !target.offer(value) ) {
+        if( !target.offer(delivery) ) {
           overflowSessions ::= target
         }
       }
+
       if( overflowSessions!=Nil ) {
-        overflow = value
+        overflow = delivery
+      } else {
+        delivered(delivery)
       }
       true
     }
   }
 
+  private def delivered(delivery: Delivery): Unit = {
+    if (delivery.ack != null) {
+      if (delivery.storeBatch != null) {
+        delivery.storeBatch.setDisposer(^ {delivery.ack(null)})
+      } else {
+        delivery.ack(null)
+      }
+    }
+    if (delivery.storeBatch != null) {
+      delivery.storeBatch.release
+    }
+  }
+
   val drainer = ^{
     if( overflow!=null ) {
       val original = overflowSessions;
@@ -320,6 +341,7 @@ class DeliveryProducerRoute(val destinat
         }
       }
       if( overflowSessions==Nil ) {
+        delivered(overflow)
         overflow = null
         refiller.run
       }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:08:24 2010
@@ -53,13 +53,13 @@ object VirtualHost extends Log {
     rc.enabled = true
     rc.hostNames.add("localhost")
 
-    val store = new CassandraStoreDTO
-    store.hosts.add("localhost:9160")
+//    val store = new CassandraStoreDTO
+//    store.hosts.add("localhost:9160")
 
 //    val store = new HawtDBStoreDTO
 //    store.directory = new File("activemq-data")
     
-    rc.store = store
+    rc.store = null
     rc
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:08:24 2010
@@ -22,7 +22,6 @@ import com.shorrockin.cascal.session._
 import java.util.concurrent.atomic.AtomicLong
 import collection.mutable.ListBuffer
 import java.util.HashMap
-import org.apache.activemq.apollo.util.IntCounter
 import org.apache.activemq.apollo.store.{QueueEntryRecord, MessageRecord, QueueStatus, QueueRecord}
 import org.apache.activemq.apollo.broker.{Logging, Log, BaseService}
 import org.apache.activemq.apollo.dto.{CassandraStoreDTO, StoreDTO}
@@ -31,7 +30,8 @@ import org.apache.activemq.apollo.broker
 import com.shorrockin.cascal.utils.Conversions._
 import org.fusesource.hawtdispatch.ScalaDispatch._
 import ReporterLevel._
-import java.util.concurrent.{ThreadFactory, TimeUnit, Executors, ExecutorService}
+import java.util.concurrent._
+import org.apache.activemq.apollo.util.{TimeCounter, IntCounter}
 
 object CassandraStore extends Log {
 
@@ -71,12 +71,12 @@ class CassandraStore extends Store with 
   /////////////////////////////////////////////////////////////////////
   val dispatchQueue = createQueue("cassandra store")
 
-  var next_queue_key = new AtomicLong(0)
-  var next_msg_key = new AtomicLong(0)
+  var next_queue_key = new AtomicLong(1)
+  var next_msg_key = new AtomicLong(1)
 
   val client = new CassandraClient()
   var config:CassandraStoreDTO = defaultConfig
-  var blocking:BlockingSupport = null
+  var blocking:ThreadPoolExecutor = null
 
   def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
 
@@ -94,7 +94,13 @@ class CassandraStore extends Store with 
 
   protected def _start(onCompleted: Runnable) = {
 
-    blocking = new BlockingSupport
+    blocking = new ThreadPoolExecutor(4, 20, 1, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable](), new ThreadFactory(){
+      def newThread(r: Runnable) = {
+        val rc = new Thread(r, "cassandra client")
+        rc.setDaemon(true)
+        rc
+      }
+    })
     client.schema = Schema(config.keyspace)
 
     // TODO: move some of this parsing code into validation too.
@@ -109,82 +115,12 @@ class CassandraStore extends Store with 
     }.toList
 
     client.start
+    schedualDisplayStats
     onCompleted.run
   }
 
   protected def _stop(onCompleted: Runnable) = {
-    blocking.setDisposer(^{
-      onCompleted
-    })
-    blocking.release
-  }
-
-
-  class BlockingSupport extends BaseRetained {
-
-    val name = "cassandra store worker"
-    var workerStackSize = 0
-    val max_workers = 20
-    var executing_workers=0
-
-    val dispatchQueue = createQueue()
-    val queued_executions = ListBuffer[()=>Unit]()
-    var executor_pool = Executors.newCachedThreadPool(new ThreadFactory {
-      def newThread(r: Runnable) = {
-        val thread = new Thread(null, r, name, workerStackSize)
-        thread.setDaemon(true)
-        thread
-      }
-    })
-
-    /**
-     * executes a blocking function in an async thread.
-     */
-    def apply( func: =>Unit ):Unit = {
-      assertRetained
-      dispatchQueue {
-        if( executing_workers >= max_workers ) {
-          queued_executions += func _
-        } else {
-          executing_workers += 1
-          execute(func _)
-        }
-      }
-    }
-
-    private def execute(func: ()=>Unit):Unit = {
-      executor_pool {
-        try {
-          func()
-        } finally {
-          execute_done
-        }
-      }
-    }
-
-    private def execute_done() = ^{
-      if ( queued_executions.isEmpty ) {
-        executing_workers -= 1
-      } else {
-        execute(queued_executions.head)
-        queued_executions.drop(1)
-      }
-      if( retained < 1 ) {
-        check_pool_disposed
-      }
-    } >>: dispatchQueue
-
-    override def dispose = ^{
-      check_pool_disposed
-    } >>: dispatchQueue
-
-    private def check_pool_disposed = {
-      if( executing_workers == 0 ) {
-        executor_pool.shutdown
-        super.dispose
-      }
-    }
-
+    blocking.shutdown
   }
 
 
@@ -193,6 +129,17 @@ class CassandraStore extends Store with 
   // Implementation of the BrokerDatabase interface
   //
   /////////////////////////////////////////////////////////////////////
+  val storeLatency = new TimeCounter
+  def schedualDisplayStats:Unit = {
+    def displayStats = {
+      if( serviceState.isStarted ) {
+        val cl = storeLatency.apply(true)
+        info("metrics: store latency: %,.3f ms", cl.avgTime(TimeUnit.MILLISECONDS))
+        schedualDisplayStats
+      }
+    }
+    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^{ displayStats })
+  }
 
   /**
    * Deletes all stored data from the store.
@@ -200,12 +147,14 @@ class CassandraStore extends Store with 
   def purge(callback: =>Unit) = {
     blocking {
       client.purge
+      next_queue_key.set(1)
+      next_msg_key.set(1)
       callback
     }
   }
 
   def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
-    val key = next_queue_key.incrementAndGet
+    val key = next_queue_key.getAndIncrement
     record.key = key
     blocking {
       client.addQueue(record)
@@ -249,13 +198,7 @@ class CassandraStore extends Store with 
     if( action == null ) {
       callback
     } else {
-      val prevDisposer = action.tx.getDisposer
-      action.tx.setDisposer(^{
-        callback
-        if(prevDisposer!=null) {
-          prevDisposer.run
-        }
-      })
+      action.tx.eagerFlush(callback _)
       flush(action.tx.txid)
     }
 
@@ -288,8 +231,11 @@ class CassandraStore extends Store with 
       }
     }
 
+    val txid:Int = next_tx_id.getAndIncrement
     var actions = Map[Long, MessageAction]()
-    var txid:Int = 0
+
+    var flushListeners = ListBuffer[Runnable]()
+    def eagerFlush(callback: Runnable) = if( callback!=null ) { this.synchronized { flushListeners += callback } }
 
     def rm(msg:Long) = {
       actions -= msg
@@ -302,7 +248,7 @@ class CassandraStore extends Store with 
     }
 
     def store(record: MessageRecord):Long = {
-      record.key = next_msg_key.incrementAndGet
+      record.key = next_msg_key.getAndIncrement
       val action = new MessageAction
       action.msg = record.key
       action.store = record
@@ -339,7 +285,11 @@ class CassandraStore extends Store with 
       transaction_source.merge(this)
     }
 
+
     def onPerformed() {
+      flushListeners.foreach { x=>
+        x.run()
+      }
       super.dispose
     }
   }
@@ -354,21 +304,14 @@ class CassandraStore extends Store with 
   var pendingEnqueues = new HashMap[(Long,Long), CassandraBatch#MessageAction]()
   var delayedTransactions = new HashMap[Int, CassandraBatch]()
 
-  var next_tx_id = new IntCounter
+  var next_tx_id = new IntCounter(1)
   
   def drain_transactions = {
     transaction_source.getData.foreach { tx =>
 
-      val tx_id = next_tx_id.incrementAndGet
-      tx.txid = tx_id
+      val tx_id = tx.txid
       delayedTransactions.put(tx_id, tx)
 
-      if( config.flushDelay > 0 ) {
-        dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
-      } else {
-        dispatchQueue.dispatchAsync(^{flush(tx_id)})
-      }
-
       tx.actions.foreach { case (msg, action) =>
         if( action.store!=null ) {
           pendingStores.put(msg, action)
@@ -407,6 +350,12 @@ class CassandraStore extends Store with 
         }
       }
 
+      if( !tx.flushListeners.isEmpty || config.flushDelay <= 0 ) {
+        flush(tx_id)
+      } else {
+        dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+      }
+
     }
   }
 
@@ -447,15 +396,16 @@ class CassandraStore extends Store with 
     }
 
     if( !txs.isEmpty ) {
-      // suspend so that we don't process more flush requests while
-      // we are concurrently executing a flush
-      flush_source.suspend
-      blocking {
-        client.store(txs)
-        txs.foreach { x=>
-          x.onPerformed
+      storeLatency.start { end =>
+        blocking {
+          client.store(txs)
+          dispatchQueue {
+            end()
+            txs.foreach { x=>
+              x.onPerformed
+            }
+          }
         }
-        flush_source.resume
       }
     }
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:08:24 2010
@@ -156,6 +156,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       pageFileFactory.setDrainOnClose(false)
       pageFileFactory.setSync(true)
       pageFileFactory.setUseWorkerThread(true)
+      pageFileFactory.setPageSize(512.toShort)
       pageFileFactory.open()
 
       withTx { tx =>
@@ -210,25 +211,25 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   }
 
   def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
-    var batch = List[TypeCreatable]()
+    var batch = ListBuffer[TypeCreatable]()
     txs.foreach {
       tx =>
         tx.actions.foreach {
           case (msg, action) =>
             if (action.messageRecord != null) {
               val update: AddMessage.Bean = action.messageRecord
-              batch ::= update
+              batch += update
             }
             action.enqueues.foreach {
               queueEntry =>
                 val update: AddQueueEntry.Bean = queueEntry
-                batch ::= update
+                batch += update
             }
             action.dequeues.foreach {
               queueEntry =>
                 val qid = queueEntry.queueKey
                 val seq = queueEntry.queueSeq
-                batch ::= new RemoveQueueEntry.Bean().setQueueKey(qid).setQueueSeq(seq)
+                batch += new RemoveQueueEntry.Bean().setQueueKey(qid).setQueueSeq(seq)
             }
         }
     }
@@ -239,9 +240,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     })
   }
 
-  def purge() = {
-    val update = new Purge.Bean()
-    store(update)
+  def purge(callback: Runnable) = {
+    store(new Purge.Bean(), callback)
   }
 
   def listQueues: Seq[Long] = {
@@ -356,7 +356,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     tracker.await
   }
 
-  private def store(updates: List[TypeCreatable], onComplete: Runnable): Unit = {
+  private def store(updates: Seq[TypeCreatable], onComplete: Runnable): Unit = {
     val batch = next_batch_id
     begin(batch)
     updates.foreach {
@@ -747,10 +747,11 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
               addAndGet(messageRefsIndex, new jl.Long(messageKey), 1)
             } else {
+              // TODO perhaps treat this like an update?
               error("Duplicate queue entry seq %d", x.getQueueSeq)
             }
           } else {
-            error("Duplicate queue entry message %d", x.getMessageKey)
+            error("Duplicate queue entry message %d was %d", x.getMessageKey, existing)
           }
         } else {
           error("Queue not found: %d", x.getQueueKey)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:08:24 2010
@@ -70,8 +70,8 @@ class HawtDBStore extends Store with Bas
   /////////////////////////////////////////////////////////////////////
   val dispatchQueue = createQueue("hawtdb store")
 
-  var next_queue_key = new AtomicLong(0)
-  var next_msg_key = new AtomicLong(0)
+  var next_queue_key = new AtomicLong(1)
+  var next_msg_key = new AtomicLong(1)
 
   var executor_pool:ExecutorService = _
   var config:HawtDBStoreDTO = defaultConfig
@@ -125,14 +125,15 @@ class HawtDBStore extends Store with Bas
    * Deletes all stored data from the store.
    */
   def purge(callback: =>Unit) = {
-    executor_pool ^{
-      client.purge
+    client.purge(^{
+      next_queue_key.set(1)
+      next_msg_key.set(1)
       callback
-    }
+    })
   }
 
   def addQueue(record: QueueRecord)(callback: (Option[Long]) => Unit) = {
-    val key = next_queue_key.incrementAndGet
+    val key = next_queue_key.getAndIncrement
     record.key = key
     executor_pool ^{
       client.addQueue(record)
@@ -171,22 +172,19 @@ class HawtDBStore extends Store with Bas
     }
   }
 
-  def flushMessage(id: Long)(callback: => Unit) = ^{
+  private def doFlush(id: Long, callback:Runnable) = {
     val action: HawtDBBatch#MessageAction = pendingStores.get(id)
     if( action == null ) {
-      callback
+      callback.run
     } else {
-      val prevDisposer = action.tx.getDisposer
-      action.tx.setDisposer(^{
-        callback
-        if(prevDisposer!=null) {
-          prevDisposer.run
-        }
-      })
+      action.tx.eagerFlush(callback)
       flush(action.tx.txid)
     }
-
-  } >>: dispatchQueue
+  }
+  
+  def flushMessage(id: Long)(callback: => Unit) = dispatchQueue {
+    doFlush(id, ^{ callback } )
+  }
 
   def createStoreBatch() = new HawtDBBatch
 
@@ -217,8 +215,11 @@ class HawtDBStore extends Store with Bas
       }
     }
 
+    val txid:Int = next_tx_id.getAndIncrement
     var actions = Map[Long, MessageAction]()
-    var txid:Int = 0
+
+    var flushListeners = ListBuffer[Runnable]()
+    def eagerFlush(callback: Runnable) = if( callback!=null ) { this.synchronized { flushListeners += callback } }
 
     def rm(msg:Long) = {
       actions -= msg
@@ -231,7 +232,7 @@ class HawtDBStore extends Store with Bas
     }
 
     def store(record: MessageRecord):Long = {
-      record.key = next_msg_key.incrementAndGet
+      record.key = next_msg_key.getAndIncrement
       val action = new MessageAction
       action.msg = record.key
       action.messageRecord = record
@@ -269,13 +270,13 @@ class HawtDBStore extends Store with Bas
       transaction_source.merge(this)
     }
 
-    def onPerformed() {
+    def onPerformed() = this.synchronized {
       metric_commit_counter += 1
       val t = TimeUnit.NANOSECONDS.toMillis(System.nanoTime-dispose_start)
-      if( t < 0 ) {
-        println("wtf")
-      }
       metric_commit_latency_counter += t
+      flushListeners.foreach { x=>
+        x.run
+      }
       super.dispose
     }
   }
@@ -302,8 +303,7 @@ class HawtDBStore extends Store with Bas
   var pendingEnqueues = new HashMap[(Long,Long), HawtDBBatch#MessageAction]()
   var delayedTransactions = new HashMap[Int, HawtDBBatch]()
 
-  var next_tx_id = new IntCounter
-
+  var next_tx_id = new IntCounter(1)
 
   def schedualDisplayStats:Unit = {
     val st = System.nanoTime
@@ -329,15 +329,7 @@ class HawtDBStore extends Store with Bas
   def drain_transactions = {
     transaction_source.getData.foreach { tx =>
 
-      val tx_id = next_tx_id.incrementAndGet
-      tx.txid = tx_id
-      delayedTransactions.put(tx_id, tx)
-
-      if( config.flushDelay > 0 ) {
-        dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
-      } else {
-        dispatchQueue.dispatchAsync(^{flush(tx_id)})
-      }
+      delayedTransactions.put(tx.txid, tx)
 
       tx.actions.foreach { case (msg, action) =>
         if( action.messageRecord!=null ) {
@@ -380,6 +372,13 @@ class HawtDBStore extends Store with Bas
         }
       }
 
+      val tx_id = tx.txid
+      if( !tx.flushListeners.isEmpty || config.flushDelay <= 0 ) {
+        flush(tx_id)
+      } else {
+        dispatchQueue.dispatchAfter(config.flushDelay, TimeUnit.MILLISECONDS, ^{flush(tx_id)})
+      }
+
     }
   }
 
@@ -402,7 +401,6 @@ class HawtDBStore extends Store with Bas
       // Message may be flushed or canceled before the timeout flush event..
       // tx may be null in those cases
       if (tx!=null) {
-
         tx.actions.foreach { case (msg, action) =>
           if( action.messageRecord !=null ) {
             metric_flushed_message_counter += 1
@@ -422,8 +420,6 @@ class HawtDBStore extends Store with Bas
     }
 
     if( !txs.isEmpty ) {
-      // suspend so that we don't process more flush requests while
-      // we are concurrently executing a flush
       client.store(txs)
     }
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul  7 04:08:24 2010
@@ -18,6 +18,8 @@ package org.apache.activemq.apollo.stomp
 
 import org.apache.activemq.transport.TransportFactory
 import org.apache.activemq.apollo.broker.{LoggingTracker, Broker}
+import java.io.File
+import org.apache.activemq.apollo.dto.{CassandraStoreDTO, HawtDBStoreDTO}
 
 /**
  */
@@ -37,6 +39,14 @@ object StompBroker {
     connector.protocol = "stomp"
     connector.advertise = uri
 
+    val store = new CassandraStoreDTO
+    store.hosts.add("localhost:9160")
+
+//    val store = new HawtDBStoreDTO
+//    store.directory = new File("activemq-data")
+    
+    broker.config.virtualHosts.get(0).store = store
+
     val tracker = new LoggingTracker("broker startup")
     tracker.start(broker)
     tracker.await
@@ -48,4 +58,5 @@ object StompBroker {
     println("Shutdown complete.")
   }
 
+  
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 04:08:24 2010
@@ -280,10 +280,12 @@ class StompProtocolHandler extends Proto
       delivery.message = message
       delivery.size = message.frame.size
 
-      if( message.persistent && host.store!=null ) {
-        storeBatch = host.store.createStoreBatch
-        delivery.storeBatch = storeBatch
-        delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
+      // User might be asking for ack that we have prcoessed the message..
+      val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
+      if( receipt!=null ) {
+        delivery.ack = { storeTx =>
+          connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+        }
       }
 
       // routes can always accept at least 1 delivery...
@@ -299,24 +301,7 @@ class StompProtocolHandler extends Proto
       // info("Dropping message.  No consumers interested in message.")
     }
 
-    // User might be asking for ack that we have prcoessed the message..
-    val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
-    if( receipt!=null ) {
-      if( storeBatch==null ) {
-        // message was not persistent we can ack back right away..
-        connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
-      } else {
-        // else lets ack back once the persistent operations are processed.
-        storeBatch.setDisposer(^{
-          connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
-        })
-      }
-    }
 
-    if( storeBatch!=null ) {
-      // We can now release the batch as we are done using it..
-      storeBatch.release
-    }
   }
 
   def on_stomp_subscribe(headers:HeaderMap) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:08:24 2010
@@ -60,10 +60,16 @@ trait StoreBatch extends Retained {
    */
   def dequeue(entry:QueueEntryRecord)
 
+
+  /**
+   * Causes the batch to flush eagerly, callback is called once flushed.
+   */
+  def eagerFlush(callback: Runnable)
+
 }
 
 /**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ *  @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait Store extends Service {
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala Wed Jul  7 04:08:24 2010
@@ -22,7 +22,14 @@ package org.apache.activemq.apollo.util
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class IntCounter(private var value:Int = 0) {
+class IntCounter(private var value:Int = 0) extends MetricProducer[Int] {
+
+  def apply(reset:Boolean):Int = {
+    val rc = value
+    value = 0
+    rc
+  }
+  def clear() = value=0
 
   def get() = value
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala?rev=961134&r1=961133&r2=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala Wed Jul  7 04:08:24 2010
@@ -22,7 +22,14 @@ package org.apache.activemq.apollo.util
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class LongCounter(private var value:Long = 0) {
+class LongCounter(private var value:Long = 0) extends MetricProducer[Long] {
+
+  def apply(reset:Boolean):Long = {
+    val rc = value
+    value = 0
+    rc
+  }
+  def clear() = value=0
 
   def get() = value
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/MetricProducer.scala (from r961133, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/MetricProducer.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/MetricProducer.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala&r1=961133&r2=961134&rev=961134&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/MetricProducer.scala Wed Jul  7 04:08:24 2010
@@ -17,28 +17,21 @@
 package org.apache.activemq.apollo.util
 
 /**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ * The MetricProducer trait is implemented by objects which
+ * measure metrics. 
  */
-class IntCounter(private var value:Int = 0) {
+trait MetricProducer[T] extends ((Boolean) => T) {
 
-  def get() = value
+  /**
+   * The function which produces captures metric value and
+   * optionally clears it.
+   */
+  def apply(clear: Boolean):T
 
-  def incrementAndGet() = addAndGet(1)
-  def decrementAndGet() = addAndGet(-1)
-  def addAndGet(amount:Int) = {
-    value+=amount
-    value
-  }
+  /**
+   * Clears the metric value.
+   */
+  def clear()
+}
 
-  def getAndIncrement() = getAndAdd(1)
-  def getAndDecrement() = getAndAdd(-11)
-  def getAndAdd(amount:Int) = {
-    val rc = value
-    value+=amount
-    rc
-  }
 
-}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala?rev=961134&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/util/TimeCounter.scala Wed Jul  7 04:08:24 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.util
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * <p>A Timer collects time durations and produces a TimingMetric.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class TimeCounter extends MetricProducer[TimeMetric] {
+
+  private var maximum = Math.MIN_LONG
+  private var minimum = Math.MAX_LONG
+  private var total = 0L
+  private var count = 0
+
+  def apply(reset: Boolean):TimeMetric = {
+    val rc = TimeMetric(count, total, minimum, maximum)
+    if (reset) {
+      clear()
+    }
+    rc
+  }
+
+  def clear() = {
+    maximum = Math.MIN_INT
+    minimum = Math.MAX_INT
+    total = 0L
+    count = 0
+  }
+
+  /**
+   * Adds a duration to our current Timing.
+   */
+  def +=(value: Long): Unit = {
+    if (value > -1) {
+      maximum = value max maximum
+      minimum = value min minimum
+      total += value
+      count += 1
+    }
+  }
+
+  /**
+   *
+   */
+  def time[T](func: => T): T = {
+    val startTime = System.nanoTime
+    try {
+      func
+    } finally {
+      this += System.nanoTime - startTime
+    }
+  }
+
+  /**
+   *
+   */
+  def start[T](func: ( ()=>Unit )=> T): T = {
+    val startTime = System.nanoTime
+    def endFunc():Unit = {
+      val end = System.nanoTime
+      this += System.nanoTime - startTime
+    }
+    func(endFunc)
+  }
+}
+
+case class TimeMetric(count:Int, total:Long, minimum:Long, maximum:Long) {
+  def maxTime(unit:TimeUnit) = (maximum).toFloat / unit.toNanos(1)
+  def minTime(unit:TimeUnit) = (minimum).toFloat / unit.toNanos(1)
+  def totalTime(unit:TimeUnit) = (total).toFloat / unit.toNanos(1)
+  def avgTime(unit:TimeUnit) = if( count==0 ) 0f else totalTime(unit) / count
+  def avgFrequency(unit:TimeUnit) = 1.toFloat / avgTime(unit)
+}
\ No newline at end of file