You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/30 15:00:33 UTC

svn commit: r980772 - in /activemq/activemq-apollo/trunk: apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/ apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/ apollo-store/src/main/scala/org/apache/activemq/...

Author: chirino
Date: Fri Jul 30 13:00:33 2010
New Revision: 980772

URL: http://svn.apache.org/viewvc?rev=980772&view=rev
Log:
Extracted out a common DelayingStoreSupport trait that the hawtdb and cassandra store impls can share.

Added:
    activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala
    activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala

Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala?rev=980772&r1=980771&r2=980772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala Fri Jul 30 13:00:33 2010
@@ -166,7 +166,7 @@ class CassandraClient() {
   }
 
 
-  def store(txs:Seq[CassandraStore#CassandraUOW]) {
+  def store(txs:Seq[DelayingStoreSupport#DelayableUOW]) {
     withSession {
       session =>
         var operations = List[Operation]()
@@ -175,8 +175,8 @@ class CassandraClient() {
             tx.actions.foreach {
               case (msg, action) =>
                 var rc =
-                if (action.store != null) {
-                  operations ::= Insert( schema.message_data \ (msg, encodeMessageRecord(action.store) ) ) 
+                if (action.messageRecord != null) {
+                  operations ::= Insert( schema.message_data \ (msg, encodeMessageRecord(action.messageRecord) ) ) 
                 }
                 action.enqueues.foreach {
                   queueEntry =>

Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala?rev=980772&r1=980771&r2=980772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala Fri Jul 30 13:00:33 2010
@@ -57,24 +57,32 @@ object CassandraStore extends Log {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class CassandraStore extends Store with BaseService with Logging {
+class CassandraStore extends DelayingStoreSupport with Logging {
 
   import CassandraStore._
   override protected def log = CassandraStore
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Implementation of the BaseService interface
-  //
-  /////////////////////////////////////////////////////////////////////
-  val dispatchQueue = createQueue("cassandra store")
-
+                                
   var next_msg_key = new AtomicLong(1)
 
   val client = new CassandraClient()
   var config:CassandraStoreDTO = defaultConfig
   var blocking:ExecutorService = null
 
+  def flush_delay = config.flush_delay
+
+  override def toString = "cassandra store"
+
+  protected def get_next_msg_key = next_msg_key.getAndIncrement
+
+  protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
+    blocking {
+      client.store(uows)
+      dispatchQueue {
+        callback
+      }
+    }
+  }
+
   def configure(config: StoreDTO, reporter: Reporter):Unit = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
 
 
@@ -218,230 +226,5 @@ class CassandraStore extends Store with 
     }
   }
 
-  def flushMessage(id: Long)(callback: => Unit) = ^{
-    val action: CassandraUOW#MessageAction = pendingStores.get(id)
-    if( action == null ) {
-      callback
-    } else {
-      action.uow.onComplete(callback _)
-      flush(action.uow.uow_id)
-    }
-
-  } >>: dispatchQueue
-
-  def createStoreUOW() = new CassandraUOW
-
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Implementation of the StoreBatch interface
-  //
-  /////////////////////////////////////////////////////////////////////
-  class CassandraUOW extends BaseRetained with StoreUOW {
-
-    class MessageAction {
-
-      var msg= 0L
-      var store: MessageRecord = null
-      var enqueues = ListBuffer[QueueEntryRecord]()
-      var dequeues = ListBuffer[QueueEntryRecord]()
-
-      def uow = CassandraUOW.this
-      def isEmpty() = store==null && enqueues==Nil && dequeues==Nil
-      def cancel() = {
-        uow.rm(msg)
-        if( uow.isEmpty ) {
-          uow.cancel
-        }
-      }
-    }
-
-    val uow_id:Int = next_uow_id.getAndIncrement
-    var actions = Map[Long, MessageAction]()
-    var flushing= false
-
-    var completeListeners = ListBuffer[Runnable]()
-
-    def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
-
-    var disableDelay = false
-    def completeASAP() = this.synchronized { disableDelay=true }
-
-    def delayable = !disableDelay
-
-
-    def rm(msg:Long) = {
-      actions -= msg
-    }
-
-    def isEmpty = actions.isEmpty
-    def cancel = {
-      delayedUOWs.remove(uow_id)
-      onPerformed
-    }
-
-    def store(record: MessageRecord):Long = {
-      record.key = next_msg_key.getAndIncrement
-      val action = new MessageAction
-      action.msg = record.key
-      action.store = record
-      this.synchronized {
-        actions += record.key -> action
-      }
-      dispatchQueue {
-        pendingStores.put(record.key, action)
-      }
-      record.key
-    }
-
-    def action(msg:Long) = {
-      actions.get(msg) match {
-        case Some(x) => x
-        case None =>
-          val x = new MessageAction
-          x.msg = msg
-          actions += msg->x
-          x
-      }
-    }
-
-    def enqueue(entry: QueueEntryRecord) = {
-      this.synchronized {
-        val a = action(entry.messageKey)
-        a.enqueues += entry
-        dispatchQueue {
-          pendingEnqueues.put(key(entry), a)
-        }
-      }
-    }
-
-    def dequeue(entry: QueueEntryRecord) = {
-      this.synchronized {
-        action(entry.messageKey).dequeues += entry
-      }
-    }
-
-    override def dispose = {
-      uow_source.merge(this)
-    }
-
-
-    def onPerformed() {
-      completeListeners.foreach { x=>
-        x.run()
-      }
-      super.dispose
-    }
-  }
-
-  def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
-
-  val uow_source = createSource(new ListEventAggregator[CassandraUOW](), dispatchQueue)
-  uow_source.setEventHandler(^{drain_uows});
-  uow_source.resume
-
-  var pendingStores = new HashMap[Long, CassandraUOW#MessageAction]()
-  var pendingEnqueues = new HashMap[(Long,Long), CassandraUOW#MessageAction]()
-  var delayedUOWs = new HashMap[Int, CassandraUOW]()
-
-  var next_uow_id = new IntCounter(1)
-  
-  def drain_uows = {
-    uow_source.getData.foreach { uow =>
-
-      val uow_id = uow.uow_id
-      delayedUOWs.put(uow_id, uow)
-
-      uow.actions.foreach { case (msg, action) =>
-
-        // dequeues can cancel out previous enqueues
-        action.dequeues.foreach { currentDequeue=>
-          val currentKey = key(currentDequeue)
-          val prevAction:CassandraUOW#MessageAction = pendingEnqueues.remove(currentKey)
-          if( prevAction!=null && !prevAction.uow.flushing ) {
-
-            // yay we can cancel out a previous enqueue
-            prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
-
-            // if the message is not in any queues.. we can gc it..
-            if( prevAction.enqueues == Nil && prevAction.store !=null ) {
-              pendingStores.remove(msg)
-              prevAction.store = null
-            }
-
-            // Cancel the action if it's now empty
-            if( prevAction.isEmpty ) {
-              prevAction.cancel()
-            }
-
-            // since we canceled out the previous enqueue.. now cancel out the action
-            action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
-            if( action.isEmpty ) {
-              action.cancel()
-            }
-          }
-        }
-      }
-
-      if( !uow.completeListeners.isEmpty || config.flush_delay <= 0 ) {
-        flush(uow_id)
-      } else {
-        dispatchQueue.dispatchAfter(config.flush_delay, TimeUnit.MILLISECONDS, ^{flush(uow_id)})
-      }
-
-    }
-  }
-
-  def flush(uow_id:Int) = {
-    flush_source.merge(uow_id)
-  }
-
-  val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
-  flush_source.setEventHandler(^{drain_flushes});
-  flush_source.resume
-
-  def drain_flushes:Unit = {
-
-    if( !serviceState.isStarted ) {
-      return
-    }
-    
-    val uows = flush_source.getData.flatMap{ uow_id =>
-      val uow = delayedUOWs.remove(uow_id)
-      // Message may be flushed or canceled before the timeout flush event..
-      // uow may be null in those cases
-      if (uow!=null) {
-        uow.flushing = true
-        Some(uow)
-      } else {
-        None
-      }
-    }
-
-    if( !uows.isEmpty ) {
-      storeLatency.start { end =>
-        blocking {
-          client.store(uows)
-          dispatchQueue {
-            end()
-            uows.foreach { uow=>
-
-              uow.actions.foreach { case (msg, action) =>
-                if( action.store!=null ) {
-                  pendingStores.remove(msg)
-                }
-                action.enqueues.foreach { queueEntry=>
-                  val k = key(queueEntry)
-                  pendingEnqueues.remove(k)
-                }
-              }
-
-              uow.onPerformed
-            }
-          }
-        }
-      }
-    }
-  }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala?rev=980772&r1=980771&r2=980772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala Fri Jul 30 13:00:33 2010
@@ -235,7 +235,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     _store(update, callback)
   }
 
-  def store(txs: Seq[HawtDBStore#HawtDBUOW], callback:Runnable) {
+  def store(txs: Seq[HawtDBStore#DelayableUOW], callback:Runnable) {
     var batch = ListBuffer[TypeCreatable]()
     txs.foreach {
       tx =>

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala?rev=980772&r1=980771&r2=980772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala Fri Jul 30 13:00:33 2010
@@ -16,19 +16,18 @@
  */
 package org.apache.activemq.apollo.store.hawtdb
 
-import org.fusesource.hawtdispatch.BaseRetained
 import java.util.concurrent.atomic.AtomicLong
 import collection.mutable.ListBuffer
 import java.util.HashMap
 import collection.{Seq}
 import org.fusesource.hawtdispatch.ScalaDispatch._
-import org.fusesource.hawtdispatch.ListEventAggregator
 import java.io.File
 import java.util.concurrent._
 import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.store._
 import org.apache.activemq.apollo.util._
 import ReporterLevel._
+import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained, ListEventAggregator}
 
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -57,18 +56,11 @@ object HawtDBStore extends Log {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class HawtDBStore extends Store with BaseService with DispatchLogging {
+class HawtDBStore extends DelayingStoreSupport with DispatchLogging {
 
   import HawtDBStore._
   override protected def log = HawtDBStore
 
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Implementation of the BaseService interface
-  //
-  /////////////////////////////////////////////////////////////////////
-  val dispatchQueue = createQueue("hawtdb store")
-
   var next_queue_key = new AtomicLong(1)
   var next_msg_key = new AtomicLong(1)
 
@@ -76,6 +68,22 @@ class HawtDBStore extends Store with Bas
   var config:HawtDBStoreDTO = defaultConfig
   val client = new HawtDBClient(this)
 
+  override def toString = "hawtdb store"
+
+  def flush_delay = config.flush_delay
+  
+  protected def get_next_msg_key = next_msg_key.getAndIncrement
+
+  protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
+    executor_pool {
+      client.store(uows, ^{
+        dispatchQueue {
+          callback
+        }
+      })
+    }
+  }
+
   def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[HawtDBStoreDTO], reporter)
 
   def configure(config: HawtDBStoreDTO, reporter: Reporter) = {
@@ -173,8 +181,6 @@ class HawtDBStore extends Store with Bas
     }
   }
 
-  
-
   val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatchQueue)
   load_source.setEventHandler(^{drain_loads});
   load_source.resume
@@ -209,158 +215,6 @@ class HawtDBStore extends Store with Bas
     }
   }
 
-  def flushMessage(messageKey: Long)(cb: => Unit) = dispatchQueue {
-    val action: HawtDBUOW#MessageAction = pendingStores.get(messageKey)
-    if( action == null ) {
-      cb
-    } else {
-      action.uow.onComplete(^{ cb })
-      flush(action.uow.uow_id)
-    }
-  }
-
-  def createStoreUOW() = new HawtDBUOW
-
-
-  /////////////////////////////////////////////////////////////////////
-  //
-  // Implementation of the StoreBatch interface
-  //
-  /////////////////////////////////////////////////////////////////////
-  class HawtDBUOW extends BaseRetained with StoreUOW {
-
-    var dispose_start:Long = 0
-    var flushing = false;
-
-    class MessageAction {
-
-      var msg= 0L
-      var messageRecord: MessageRecord = null
-      var enqueues = ListBuffer[QueueEntryRecord]()
-      var dequeues = ListBuffer[QueueEntryRecord]()
-
-      def uow = HawtDBUOW.this
-      def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
-
-      def cancel() = {
-        uow.rm(msg)
-      }
-    }
-
-    val uow_id:Int = next_batch_id.getAndIncrement
-    var actions = Map[Long, MessageAction]()
-
-    var completeListeners = ListBuffer[Runnable]()
-    var disableDelay = false
-
-    def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
-
-    def completeASAP() = this.synchronized { disableDelay=true }
-
-    var delayable_actions = 0
-
-    def delayable = !disableDelay && delayable_actions>0 && config.flush_delay>=0
-
-    def rm(msg:Long) = {
-      actions -= msg
-      if( actions.isEmpty ) {
-        cancel
-      }
-    }
-
-    def cancel = {
-      delayedUOWs.remove(uow_id)
-      onPerformed
-    }
-
-    def store(record: MessageRecord):Long = {
-      record.key = next_msg_key.getAndIncrement
-      val action = new MessageAction
-      action.msg = record.key
-      action.messageRecord = record
-      this.synchronized {
-        actions += record.key -> action
-      }
-      dispatchQueue {
-        pendingStores.put(record.key, action)
-      }
-      delayable_actions += 1
-      record.key
-    }
-
-    def action(msg:Long) = {
-      actions.get(msg) match {
-        case Some(x) => x
-        case None =>
-          val x = new MessageAction
-          x.msg = msg
-          actions += msg->x
-          x
-      }
-    }
-
-    def enqueue(entry: QueueEntryRecord) = {
-      val a = this.synchronized {
-        val a = action(entry.messageKey)
-        a.enqueues += entry
-        delayable_actions += 1
-        a
-      }
-      dispatchQueue {
-        pending_enqueues.put(key(entry), a)
-      }
-
-    }
-
-    def dequeue(entry: QueueEntryRecord) = {
-      this.synchronized {
-        action(entry.messageKey).dequeues += entry
-      }
-    }
-
-    override def dispose = {
-      dispose_start = System.nanoTime
-      uow_source.merge(this)
-    }
-
-    def onPerformed() = this.synchronized {
-      commit_latency_counter += System.nanoTime-dispose_start
-      completeListeners.foreach { x=>
-        x.run
-      }
-      super.dispose
-    }
-  }
-
-  var metric_canceled_message_counter:Long = 0
-  var metric_canceled_enqueue_counter:Long = 0
-  var metric_flushed_message_counter:Long = 0
-  var metric_flushed_enqueue_counter:Long = 0
-
-  val commit_latency_counter = new TimeCounter
-  var commit_latency = commit_latency_counter(false)
-
-  val message_load_latency_counter = new TimeCounter
-  var message_load_latency = message_load_latency_counter(false)
-
-  val message_load_batch_size_counter = new IntMetricCounter
-  var message_load_batch_size = message_load_batch_size_counter(false)
-
-  var canceled_add_message:Long = 0
-  var canceled_enqueue:Long = 0
-
-
-  def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
-
-  val uow_source = createSource(new ListEventAggregator[HawtDBUOW](), dispatchQueue)
-  uow_source.setEventHandler(^{drain_uows});
-  uow_source.resume
-
-  var pendingStores = new HashMap[Long, HawtDBUOW#MessageAction]()
-  var pending_enqueues = new HashMap[(Long,Long), HawtDBUOW#MessageAction]()
-  var delayedUOWs = new HashMap[Int, HawtDBUOW]()
-
-  var next_batch_id = new IntCounter(1)
 
   implicit def toTimeMetricDTO( m: TimeMetric) = {
     val rc = new TimeMetricDTO()
@@ -380,28 +234,6 @@ class HawtDBStore extends Store with Bas
     rc
   }
 
-  def storeStatusDTO(callback:(StoreStatusDTO)=>Unit) = dispatchQueue {
-    val rc = new HawtDBStoreStatusDTO
-
-    rc.state = serviceState.toString
-    rc.state_since = serviceState.since
-
-    rc.flush_latency = flush_latency
-    rc.message_load_latency = message_load_latency
-    rc.message_load_batch_size = message_load_batch_size
-
-    rc.journal_append_latency = client.metric_journal_append
-    rc.index_update_latency = client.metric_index_update
-
-    rc.canceled_message_counter = metric_canceled_message_counter
-    rc.canceled_enqueue_counter = metric_canceled_enqueue_counter
-    rc.flushed_message_counter = metric_flushed_message_counter
-    rc.flushed_enqueue_counter = metric_flushed_enqueue_counter
-
-    callback(rc)
-  }
-
-
   def poll_stats:Unit = {
     def displayStats = {
       if( serviceState.isStarted ) {
@@ -416,126 +248,28 @@ class HawtDBStore extends Store with Bas
         poll_stats
       }
     }
-    
-    dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
-  }
-
-  def drain_uows = {
-    uow_source.getData.foreach { uow =>
-
-      delayedUOWs.put(uow.uow_id, uow)
-
-      uow.actions.foreach { case (msg, action) =>
-
-        // dequeues can cancel out previous enqueues
-        action.dequeues.foreach { currentDequeue=>
-          val currentKey = key(currentDequeue)
-          val prev_action:HawtDBUOW#MessageAction = pending_enqueues.remove(currentKey)
-
-          def prev_batch = prev_action.uow
-          
-          if( prev_action!=null && !prev_batch.flushing ) {
-
-
-            prev_batch.delayable_actions -= 1
-            metric_canceled_enqueue_counter += 1
-
-            // yay we can cancel out a previous enqueue
-            prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) == currentKey )
-
-            // if the message is not in any queues.. we can gc it..
-            if( prev_action.enqueues == Nil && prev_action.messageRecord !=null ) {
-              pendingStores.remove(msg)
-              prev_action.messageRecord = null
-              prev_batch.delayable_actions -= 1
-              metric_canceled_message_counter += 1
-            }
-
-            // Cancel the action if it's now empty
-            if( prev_action.isEmpty ) {
-              prev_action.cancel()
-            } else if( !prev_batch.delayable ) {
-              // flush it if there is no point in delyaing anymore
-              flush(prev_batch.uow_id)
-            }
-
-            // since we canceled out the previous enqueue.. now cancel out the action
-            action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
-            if( action.isEmpty ) {
-              action.cancel()
-            }
-          }
-        }
-      }
-
-      val batch_id = uow.uow_id
-      if( uow.delayable ) {
-        dispatchQueue.dispatchAfter(config.flush_delay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
-      } else {
-        flush(batch_id)
-      }
-
-    }
-  }
 
-  def flush(batch_id:Int) = {
-    flush_source.merge(batch_id)
+    dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
   }
 
-  val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
-  flush_source.setEventHandler(^{drain_flushes});
-  flush_source.resume
+  def storeStatusDTO(callback:(StoreStatusDTO)=>Unit) = dispatchQueue {
+    val rc = new HawtDBStoreStatusDTO
 
-  val flush_latency_counter = new TimeCounter
-  var flush_latency = flush_latency_counter(false)
+    rc.state = serviceState.toString
+    rc.state_since = serviceState.since
 
-  def drain_flushes:Unit = {
+    rc.flush_latency = flush_latency
+    rc.message_load_latency = message_load_latency
+    rc.message_load_batch_size = message_load_batch_size
 
-    if( !serviceState.isStarted ) {
-      return
-    }
-    
-    val uows = flush_source.getData.flatMap{ uow_id =>
+    rc.journal_append_latency = client.metric_journal_append
+    rc.index_update_latency = client.metric_index_update
 
-      val uow = delayedUOWs.remove(uow_id)
-      // Message may be flushed or canceled before the timeout flush event..
-      // uow may be null in those cases
-      if (uow!=null) {
-        uow.flushing = true
-        Some(uow)
-      } else {
-        None
-      }
-    }
+    rc.canceled_message_counter = metric_canceled_message_counter
+    rc.canceled_enqueue_counter = metric_canceled_enqueue_counter
+    rc.flushed_message_counter = metric_flushed_message_counter
+    rc.flushed_enqueue_counter = metric_flushed_enqueue_counter
 
-    if( !uows.isEmpty ) {
-      flush_latency_counter.start { end=>
-        executor_pool {
-          client.store(uows, ^{
-            dispatchQueue {
-
-              end()
-              uows.foreach { uow=>
-
-                uow.actions.foreach { case (msg, action) =>
-                  if( action.messageRecord !=null ) {
-                    metric_flushed_message_counter += 1
-                    pendingStores.remove(msg)
-                  }
-                  action.enqueues.foreach { queueEntry=>
-                    metric_flushed_enqueue_counter += 1
-                    val k = key(queueEntry)
-                    pending_enqueues.remove(k)
-                  }
-                }
-                uow.onPerformed
-
-              }
-            }
-          })
-        }
-      }
-    }
+    callback(rc)
   }
-
 }

Added: activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala?rev=980772&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala Fri Jul 30 13:00:33 2010
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.store
+
+import collection.mutable.ListBuffer
+import java.util.HashMap
+import collection.Seq
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import java.util.concurrent._
+import org.apache.activemq.apollo.util._
+import org.fusesource.hawtdispatch.{BaseRetained, ListEventAggregator}
+
+/**
+ * <p>
+ * Support class for implementing Stores which delay doing updates
+ * so that it can support potentially be canceling the update due
+ * to subsequent operation.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait DelayingStoreSupport extends Store with BaseService {
+
+  protected def flush_delay:Long
+
+  protected def get_next_msg_key:Long
+
+  protected def store(uows: Seq[DelayableUOW])(callback: =>Unit):Unit
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the BaseService interface
+  //
+  /////////////////////////////////////////////////////////////////////
+  val dispatchQueue = createQueue(toString)
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the StoreBatch interface
+  //
+  /////////////////////////////////////////////////////////////////////
+  def createStoreUOW() = new DelayableUOW
+
+  class DelayableUOW extends BaseRetained with StoreUOW {
+
+    var dispose_start:Long = 0
+    var flushing = false;
+
+    class MessageAction {
+
+      var msg= 0L
+      var messageRecord: MessageRecord = null
+      var enqueues = ListBuffer[QueueEntryRecord]()
+      var dequeues = ListBuffer[QueueEntryRecord]()
+
+      def uow = DelayableUOW.this
+      def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
+
+      def cancel() = {
+        uow.rm(msg)
+      }
+    }
+
+    val uow_id:Int = next_batch_id.getAndIncrement
+    var actions = Map[Long, MessageAction]()
+
+    var completeListeners = ListBuffer[Runnable]()
+    var disableDelay = false
+
+    def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
+
+    def completeASAP() = this.synchronized { disableDelay=true }
+
+    var delayable_actions = 0
+
+    def delayable = !disableDelay && delayable_actions>0 && flush_delay>=0
+
+    def rm(msg:Long) = {
+      actions -= msg
+      if( actions.isEmpty ) {
+        cancel
+      }
+    }
+
+    def cancel = {
+      delayedUOWs.remove(uow_id)
+      onPerformed
+    }
+
+    def store(record: MessageRecord):Long = {
+      record.key = get_next_msg_key
+      val action = new MessageAction
+      action.msg = record.key
+      action.messageRecord = record
+      this.synchronized {
+        actions += record.key -> action
+      }
+      dispatchQueue {
+        pendingStores.put(record.key, action)
+      }
+      delayable_actions += 1
+      record.key
+    }
+
+    def action(msg:Long) = {
+      actions.get(msg) match {
+        case Some(x) => x
+        case None =>
+          val x = new MessageAction
+          x.msg = msg
+          actions += msg->x
+          x
+      }
+    }
+
+    def enqueue(entry: QueueEntryRecord) = {
+      val a = this.synchronized {
+        val a = action(entry.messageKey)
+        a.enqueues += entry
+        delayable_actions += 1
+        a
+      }
+      dispatchQueue {
+        pending_enqueues.put(key(entry), a)
+      }
+
+    }
+
+    def dequeue(entry: QueueEntryRecord) = {
+      this.synchronized {
+        action(entry.messageKey).dequeues += entry
+      }
+    }
+
+    override def dispose = {
+      dispose_start = System.nanoTime
+      uow_source.merge(this)
+    }
+
+    def onPerformed() = this.synchronized {
+      commit_latency_counter += System.nanoTime-dispose_start
+      completeListeners.foreach { x=>
+        x.run
+      }
+      super.dispose
+    }
+  }
+
+
+  def flushMessage(messageKey: Long)(cb: => Unit) = dispatchQueue {
+    val action: DelayableUOW#MessageAction = pendingStores.get(messageKey)
+    if( action == null ) {
+      cb
+    } else {
+      action.uow.onComplete(^{ cb })
+      flush(action.uow.uow_id)
+    }
+  }
+
+
+  var metric_canceled_message_counter:Long = 0
+  var metric_canceled_enqueue_counter:Long = 0
+  var metric_flushed_message_counter:Long = 0
+  var metric_flushed_enqueue_counter:Long = 0
+
+  val commit_latency_counter = new TimeCounter
+  var commit_latency = commit_latency_counter(false)
+
+  val message_load_latency_counter = new TimeCounter
+  var message_load_latency = message_load_latency_counter(false)
+
+  val message_load_batch_size_counter = new IntMetricCounter
+  var message_load_batch_size = message_load_batch_size_counter(false)
+
+  var canceled_add_message:Long = 0
+  var canceled_enqueue:Long = 0
+
+
+  def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
+
+  val uow_source = createSource(new ListEventAggregator[DelayableUOW](), dispatchQueue)
+  uow_source.setEventHandler(^{drain_uows});
+  uow_source.resume
+
+  var pendingStores = new HashMap[Long, DelayableUOW#MessageAction]()
+  var pending_enqueues = new HashMap[(Long,Long), DelayableUOW#MessageAction]()
+  var delayedUOWs = new HashMap[Int, DelayableUOW]()
+
+  var next_batch_id = new IntCounter(1)
+
+  def drain_uows = {
+    uow_source.getData.foreach { uow =>
+
+      delayedUOWs.put(uow.uow_id, uow)
+
+      uow.actions.foreach { case (msg, action) =>
+
+        // dequeues can cancel out previous enqueues
+        action.dequeues.foreach { currentDequeue=>
+          val currentKey = key(currentDequeue)
+          val prev_action:DelayableUOW#MessageAction = pending_enqueues.remove(currentKey)
+
+          def prev_batch = prev_action.uow
+
+          if( prev_action!=null && !prev_batch.flushing ) {
+
+
+            prev_batch.delayable_actions -= 1
+            metric_canceled_enqueue_counter += 1
+
+            // yay we can cancel out a previous enqueue
+            prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) == currentKey )
+
+            // if the message is not in any queues.. we can gc it..
+            if( prev_action.enqueues == Nil && prev_action.messageRecord !=null ) {
+              pendingStores.remove(msg)
+              prev_action.messageRecord = null
+              prev_batch.delayable_actions -= 1
+              metric_canceled_message_counter += 1
+            }
+
+            // Cancel the action if it's now empty
+            if( prev_action.isEmpty ) {
+              prev_action.cancel()
+            } else if( !prev_batch.delayable ) {
+              // flush it if there is no point in delyaing anymore
+              flush(prev_batch.uow_id)
+            }
+
+            // since we canceled out the previous enqueue.. now cancel out the action
+            action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
+            if( action.isEmpty ) {
+              action.cancel()
+            }
+          }
+        }
+      }
+
+      val batch_id = uow.uow_id
+      if( uow.delayable ) {
+        dispatchQueue.dispatchAfter(flush_delay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
+      } else {
+        flush(batch_id)
+      }
+
+    }
+  }
+
+  private def flush(batch_id:Int) = {
+    flush_source.merge(batch_id)
+  }
+
+  val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
+  flush_source.setEventHandler(^{drain_flushes});
+  flush_source.resume
+
+  val flush_latency_counter = new TimeCounter
+  var flush_latency = flush_latency_counter(false)
+
+  def drain_flushes:Unit = {
+
+    if( !serviceState.isStarted ) {
+      return
+    }
+
+    val uows = flush_source.getData.flatMap{ uow_id =>
+
+      val uow = delayedUOWs.remove(uow_id)
+      // Message may be flushed or canceled before the timeout flush event..
+      // uow may be null in those cases
+      if (uow!=null) {
+        uow.flushing = true
+        Some(uow)
+      } else {
+        None
+      }
+    }
+
+    if( !uows.isEmpty ) {
+      flush_latency_counter.start { end=>
+        store(uows) {
+          end()
+          uows.foreach { uow=>
+
+            uow.actions.foreach { case (msg, action) =>
+              if( action.messageRecord !=null ) {
+                metric_flushed_message_counter += 1
+                pendingStores.remove(msg)
+              }
+              action.enqueues.foreach { queueEntry=>
+                metric_flushed_enqueue_counter += 1
+                val k = key(queueEntry)
+                pending_enqueues.remove(k)
+              }
+            }
+            uow.onPerformed
+
+          }
+        }
+      }
+    }
+  }
+
+}