You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/30 15:00:33 UTC
svn commit: r980772 - in /activemq/activemq-apollo/trunk:
apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/
apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/
apollo-store/src/main/scala/org/apache/activemq/...
Author: chirino
Date: Fri Jul 30 13:00:33 2010
New Revision: 980772
URL: http://svn.apache.org/viewvc?rev=980772&view=rev
Log:
Extracted out a common DelayingStoreSupport trait that the hawtdb and cassandra store impls can share.
Added:
activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala
Modified:
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala?rev=980772&r1=980771&r2=980772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraClient.scala Fri Jul 30 13:00:33 2010
@@ -166,7 +166,7 @@ class CassandraClient() {
}
- def store(txs:Seq[CassandraStore#CassandraUOW]) {
+ def store(txs:Seq[DelayingStoreSupport#DelayableUOW]) {
withSession {
session =>
var operations = List[Operation]()
@@ -175,8 +175,8 @@ class CassandraClient() {
tx.actions.foreach {
case (msg, action) =>
var rc =
- if (action.store != null) {
- operations ::= Insert( schema.message_data \ (msg, encodeMessageRecord(action.store) ) )
+ if (action.messageRecord != null) {
+ operations ::= Insert( schema.message_data \ (msg, encodeMessageRecord(action.messageRecord) ) )
}
action.enqueues.foreach {
queueEntry =>
Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala?rev=980772&r1=980771&r2=980772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/store/cassandra/CassandraStore.scala Fri Jul 30 13:00:33 2010
@@ -57,24 +57,32 @@ object CassandraStore extends Log {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class CassandraStore extends Store with BaseService with Logging {
+class CassandraStore extends DelayingStoreSupport with Logging {
import CassandraStore._
override protected def log = CassandraStore
-
- /////////////////////////////////////////////////////////////////////
- //
- // Implementation of the BaseService interface
- //
- /////////////////////////////////////////////////////////////////////
- val dispatchQueue = createQueue("cassandra store")
-
+
var next_msg_key = new AtomicLong(1)
val client = new CassandraClient()
var config:CassandraStoreDTO = defaultConfig
var blocking:ExecutorService = null
+ def flush_delay = config.flush_delay
+
+ override def toString = "cassandra store"
+
+ protected def get_next_msg_key = next_msg_key.getAndIncrement
+
+ protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
+ blocking {
+ client.store(uows)
+ dispatchQueue {
+ callback
+ }
+ }
+ }
+
def configure(config: StoreDTO, reporter: Reporter):Unit = configure(config.asInstanceOf[CassandraStoreDTO], reporter)
@@ -218,230 +226,5 @@ class CassandraStore extends Store with
}
}
- def flushMessage(id: Long)(callback: => Unit) = ^{
- val action: CassandraUOW#MessageAction = pendingStores.get(id)
- if( action == null ) {
- callback
- } else {
- action.uow.onComplete(callback _)
- flush(action.uow.uow_id)
- }
-
- } >>: dispatchQueue
-
- def createStoreUOW() = new CassandraUOW
-
-
- /////////////////////////////////////////////////////////////////////
- //
- // Implementation of the StoreBatch interface
- //
- /////////////////////////////////////////////////////////////////////
- class CassandraUOW extends BaseRetained with StoreUOW {
-
- class MessageAction {
-
- var msg= 0L
- var store: MessageRecord = null
- var enqueues = ListBuffer[QueueEntryRecord]()
- var dequeues = ListBuffer[QueueEntryRecord]()
-
- def uow = CassandraUOW.this
- def isEmpty() = store==null && enqueues==Nil && dequeues==Nil
- def cancel() = {
- uow.rm(msg)
- if( uow.isEmpty ) {
- uow.cancel
- }
- }
- }
-
- val uow_id:Int = next_uow_id.getAndIncrement
- var actions = Map[Long, MessageAction]()
- var flushing= false
-
- var completeListeners = ListBuffer[Runnable]()
-
- def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
-
- var disableDelay = false
- def completeASAP() = this.synchronized { disableDelay=true }
-
- def delayable = !disableDelay
-
-
- def rm(msg:Long) = {
- actions -= msg
- }
-
- def isEmpty = actions.isEmpty
- def cancel = {
- delayedUOWs.remove(uow_id)
- onPerformed
- }
-
- def store(record: MessageRecord):Long = {
- record.key = next_msg_key.getAndIncrement
- val action = new MessageAction
- action.msg = record.key
- action.store = record
- this.synchronized {
- actions += record.key -> action
- }
- dispatchQueue {
- pendingStores.put(record.key, action)
- }
- record.key
- }
-
- def action(msg:Long) = {
- actions.get(msg) match {
- case Some(x) => x
- case None =>
- val x = new MessageAction
- x.msg = msg
- actions += msg->x
- x
- }
- }
-
- def enqueue(entry: QueueEntryRecord) = {
- this.synchronized {
- val a = action(entry.messageKey)
- a.enqueues += entry
- dispatchQueue {
- pendingEnqueues.put(key(entry), a)
- }
- }
- }
-
- def dequeue(entry: QueueEntryRecord) = {
- this.synchronized {
- action(entry.messageKey).dequeues += entry
- }
- }
-
- override def dispose = {
- uow_source.merge(this)
- }
-
-
- def onPerformed() {
- completeListeners.foreach { x=>
- x.run()
- }
- super.dispose
- }
- }
-
- def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
-
- val uow_source = createSource(new ListEventAggregator[CassandraUOW](), dispatchQueue)
- uow_source.setEventHandler(^{drain_uows});
- uow_source.resume
-
- var pendingStores = new HashMap[Long, CassandraUOW#MessageAction]()
- var pendingEnqueues = new HashMap[(Long,Long), CassandraUOW#MessageAction]()
- var delayedUOWs = new HashMap[Int, CassandraUOW]()
-
- var next_uow_id = new IntCounter(1)
-
- def drain_uows = {
- uow_source.getData.foreach { uow =>
-
- val uow_id = uow.uow_id
- delayedUOWs.put(uow_id, uow)
-
- uow.actions.foreach { case (msg, action) =>
-
- // dequeues can cancel out previous enqueues
- action.dequeues.foreach { currentDequeue=>
- val currentKey = key(currentDequeue)
- val prevAction:CassandraUOW#MessageAction = pendingEnqueues.remove(currentKey)
- if( prevAction!=null && !prevAction.uow.flushing ) {
-
- // yay we can cancel out a previous enqueue
- prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
-
- // if the message is not in any queues.. we can gc it..
- if( prevAction.enqueues == Nil && prevAction.store !=null ) {
- pendingStores.remove(msg)
- prevAction.store = null
- }
-
- // Cancel the action if it's now empty
- if( prevAction.isEmpty ) {
- prevAction.cancel()
- }
-
- // since we canceled out the previous enqueue.. now cancel out the action
- action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
- if( action.isEmpty ) {
- action.cancel()
- }
- }
- }
- }
-
- if( !uow.completeListeners.isEmpty || config.flush_delay <= 0 ) {
- flush(uow_id)
- } else {
- dispatchQueue.dispatchAfter(config.flush_delay, TimeUnit.MILLISECONDS, ^{flush(uow_id)})
- }
-
- }
- }
-
- def flush(uow_id:Int) = {
- flush_source.merge(uow_id)
- }
-
- val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
- flush_source.setEventHandler(^{drain_flushes});
- flush_source.resume
-
- def drain_flushes:Unit = {
-
- if( !serviceState.isStarted ) {
- return
- }
-
- val uows = flush_source.getData.flatMap{ uow_id =>
- val uow = delayedUOWs.remove(uow_id)
- // Message may be flushed or canceled before the timeout flush event..
- // uow may be null in those cases
- if (uow!=null) {
- uow.flushing = true
- Some(uow)
- } else {
- None
- }
- }
-
- if( !uows.isEmpty ) {
- storeLatency.start { end =>
- blocking {
- client.store(uows)
- dispatchQueue {
- end()
- uows.foreach { uow=>
-
- uow.actions.foreach { case (msg, action) =>
- if( action.store!=null ) {
- pendingStores.remove(msg)
- }
- action.enqueues.foreach { queueEntry=>
- val k = key(queueEntry)
- pendingEnqueues.remove(k)
- }
- }
-
- uow.onPerformed
- }
- }
- }
- }
- }
- }
}
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala?rev=980772&r1=980771&r2=980772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala Fri Jul 30 13:00:33 2010
@@ -235,7 +235,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
_store(update, callback)
}
- def store(txs: Seq[HawtDBStore#HawtDBUOW], callback:Runnable) {
+ def store(txs: Seq[HawtDBStore#DelayableUOW], callback:Runnable) {
var batch = ListBuffer[TypeCreatable]()
txs.foreach {
tx =>
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala?rev=980772&r1=980771&r2=980772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala Fri Jul 30 13:00:33 2010
@@ -16,19 +16,18 @@
*/
package org.apache.activemq.apollo.store.hawtdb
-import org.fusesource.hawtdispatch.BaseRetained
import java.util.concurrent.atomic.AtomicLong
import collection.mutable.ListBuffer
import java.util.HashMap
import collection.{Seq}
import org.fusesource.hawtdispatch.ScalaDispatch._
-import org.fusesource.hawtdispatch.ListEventAggregator
import java.io.File
import java.util.concurrent._
import org.apache.activemq.apollo.dto._
import org.apache.activemq.apollo.store._
import org.apache.activemq.apollo.util._
import ReporterLevel._
+import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained, ListEventAggregator}
object HawtDBStore extends Log {
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -57,18 +56,11 @@ object HawtDBStore extends Log {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class HawtDBStore extends Store with BaseService with DispatchLogging {
+class HawtDBStore extends DelayingStoreSupport with DispatchLogging {
import HawtDBStore._
override protected def log = HawtDBStore
- /////////////////////////////////////////////////////////////////////
- //
- // Implementation of the BaseService interface
- //
- /////////////////////////////////////////////////////////////////////
- val dispatchQueue = createQueue("hawtdb store")
-
var next_queue_key = new AtomicLong(1)
var next_msg_key = new AtomicLong(1)
@@ -76,6 +68,22 @@ class HawtDBStore extends Store with Bas
var config:HawtDBStoreDTO = defaultConfig
val client = new HawtDBClient(this)
+ override def toString = "hawtdb store"
+
+ def flush_delay = config.flush_delay
+
+ protected def get_next_msg_key = next_msg_key.getAndIncrement
+
+ protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
+ executor_pool {
+ client.store(uows, ^{
+ dispatchQueue {
+ callback
+ }
+ })
+ }
+ }
+
def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[HawtDBStoreDTO], reporter)
def configure(config: HawtDBStoreDTO, reporter: Reporter) = {
@@ -173,8 +181,6 @@ class HawtDBStore extends Store with Bas
}
}
-
-
val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatchQueue)
load_source.setEventHandler(^{drain_loads});
load_source.resume
@@ -209,158 +215,6 @@ class HawtDBStore extends Store with Bas
}
}
- def flushMessage(messageKey: Long)(cb: => Unit) = dispatchQueue {
- val action: HawtDBUOW#MessageAction = pendingStores.get(messageKey)
- if( action == null ) {
- cb
- } else {
- action.uow.onComplete(^{ cb })
- flush(action.uow.uow_id)
- }
- }
-
- def createStoreUOW() = new HawtDBUOW
-
-
- /////////////////////////////////////////////////////////////////////
- //
- // Implementation of the StoreBatch interface
- //
- /////////////////////////////////////////////////////////////////////
- class HawtDBUOW extends BaseRetained with StoreUOW {
-
- var dispose_start:Long = 0
- var flushing = false;
-
- class MessageAction {
-
- var msg= 0L
- var messageRecord: MessageRecord = null
- var enqueues = ListBuffer[QueueEntryRecord]()
- var dequeues = ListBuffer[QueueEntryRecord]()
-
- def uow = HawtDBUOW.this
- def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
-
- def cancel() = {
- uow.rm(msg)
- }
- }
-
- val uow_id:Int = next_batch_id.getAndIncrement
- var actions = Map[Long, MessageAction]()
-
- var completeListeners = ListBuffer[Runnable]()
- var disableDelay = false
-
- def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
-
- def completeASAP() = this.synchronized { disableDelay=true }
-
- var delayable_actions = 0
-
- def delayable = !disableDelay && delayable_actions>0 && config.flush_delay>=0
-
- def rm(msg:Long) = {
- actions -= msg
- if( actions.isEmpty ) {
- cancel
- }
- }
-
- def cancel = {
- delayedUOWs.remove(uow_id)
- onPerformed
- }
-
- def store(record: MessageRecord):Long = {
- record.key = next_msg_key.getAndIncrement
- val action = new MessageAction
- action.msg = record.key
- action.messageRecord = record
- this.synchronized {
- actions += record.key -> action
- }
- dispatchQueue {
- pendingStores.put(record.key, action)
- }
- delayable_actions += 1
- record.key
- }
-
- def action(msg:Long) = {
- actions.get(msg) match {
- case Some(x) => x
- case None =>
- val x = new MessageAction
- x.msg = msg
- actions += msg->x
- x
- }
- }
-
- def enqueue(entry: QueueEntryRecord) = {
- val a = this.synchronized {
- val a = action(entry.messageKey)
- a.enqueues += entry
- delayable_actions += 1
- a
- }
- dispatchQueue {
- pending_enqueues.put(key(entry), a)
- }
-
- }
-
- def dequeue(entry: QueueEntryRecord) = {
- this.synchronized {
- action(entry.messageKey).dequeues += entry
- }
- }
-
- override def dispose = {
- dispose_start = System.nanoTime
- uow_source.merge(this)
- }
-
- def onPerformed() = this.synchronized {
- commit_latency_counter += System.nanoTime-dispose_start
- completeListeners.foreach { x=>
- x.run
- }
- super.dispose
- }
- }
-
- var metric_canceled_message_counter:Long = 0
- var metric_canceled_enqueue_counter:Long = 0
- var metric_flushed_message_counter:Long = 0
- var metric_flushed_enqueue_counter:Long = 0
-
- val commit_latency_counter = new TimeCounter
- var commit_latency = commit_latency_counter(false)
-
- val message_load_latency_counter = new TimeCounter
- var message_load_latency = message_load_latency_counter(false)
-
- val message_load_batch_size_counter = new IntMetricCounter
- var message_load_batch_size = message_load_batch_size_counter(false)
-
- var canceled_add_message:Long = 0
- var canceled_enqueue:Long = 0
-
-
- def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
-
- val uow_source = createSource(new ListEventAggregator[HawtDBUOW](), dispatchQueue)
- uow_source.setEventHandler(^{drain_uows});
- uow_source.resume
-
- var pendingStores = new HashMap[Long, HawtDBUOW#MessageAction]()
- var pending_enqueues = new HashMap[(Long,Long), HawtDBUOW#MessageAction]()
- var delayedUOWs = new HashMap[Int, HawtDBUOW]()
-
- var next_batch_id = new IntCounter(1)
implicit def toTimeMetricDTO( m: TimeMetric) = {
val rc = new TimeMetricDTO()
@@ -380,28 +234,6 @@ class HawtDBStore extends Store with Bas
rc
}
- def storeStatusDTO(callback:(StoreStatusDTO)=>Unit) = dispatchQueue {
- val rc = new HawtDBStoreStatusDTO
-
- rc.state = serviceState.toString
- rc.state_since = serviceState.since
-
- rc.flush_latency = flush_latency
- rc.message_load_latency = message_load_latency
- rc.message_load_batch_size = message_load_batch_size
-
- rc.journal_append_latency = client.metric_journal_append
- rc.index_update_latency = client.metric_index_update
-
- rc.canceled_message_counter = metric_canceled_message_counter
- rc.canceled_enqueue_counter = metric_canceled_enqueue_counter
- rc.flushed_message_counter = metric_flushed_message_counter
- rc.flushed_enqueue_counter = metric_flushed_enqueue_counter
-
- callback(rc)
- }
-
-
def poll_stats:Unit = {
def displayStats = {
if( serviceState.isStarted ) {
@@ -416,126 +248,28 @@ class HawtDBStore extends Store with Bas
poll_stats
}
}
-
- dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
- }
-
- def drain_uows = {
- uow_source.getData.foreach { uow =>
-
- delayedUOWs.put(uow.uow_id, uow)
-
- uow.actions.foreach { case (msg, action) =>
-
- // dequeues can cancel out previous enqueues
- action.dequeues.foreach { currentDequeue=>
- val currentKey = key(currentDequeue)
- val prev_action:HawtDBUOW#MessageAction = pending_enqueues.remove(currentKey)
-
- def prev_batch = prev_action.uow
-
- if( prev_action!=null && !prev_batch.flushing ) {
-
-
- prev_batch.delayable_actions -= 1
- metric_canceled_enqueue_counter += 1
-
- // yay we can cancel out a previous enqueue
- prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) == currentKey )
-
- // if the message is not in any queues.. we can gc it..
- if( prev_action.enqueues == Nil && prev_action.messageRecord !=null ) {
- pendingStores.remove(msg)
- prev_action.messageRecord = null
- prev_batch.delayable_actions -= 1
- metric_canceled_message_counter += 1
- }
-
- // Cancel the action if it's now empty
- if( prev_action.isEmpty ) {
- prev_action.cancel()
- } else if( !prev_batch.delayable ) {
- // flush it if there is no point in delyaing anymore
- flush(prev_batch.uow_id)
- }
-
- // since we canceled out the previous enqueue.. now cancel out the action
- action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
- if( action.isEmpty ) {
- action.cancel()
- }
- }
- }
- }
-
- val batch_id = uow.uow_id
- if( uow.delayable ) {
- dispatchQueue.dispatchAfter(config.flush_delay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
- } else {
- flush(batch_id)
- }
-
- }
- }
- def flush(batch_id:Int) = {
- flush_source.merge(batch_id)
+ dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ displayStats })
}
- val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
- flush_source.setEventHandler(^{drain_flushes});
- flush_source.resume
+ def storeStatusDTO(callback:(StoreStatusDTO)=>Unit) = dispatchQueue {
+ val rc = new HawtDBStoreStatusDTO
- val flush_latency_counter = new TimeCounter
- var flush_latency = flush_latency_counter(false)
+ rc.state = serviceState.toString
+ rc.state_since = serviceState.since
- def drain_flushes:Unit = {
+ rc.flush_latency = flush_latency
+ rc.message_load_latency = message_load_latency
+ rc.message_load_batch_size = message_load_batch_size
- if( !serviceState.isStarted ) {
- return
- }
-
- val uows = flush_source.getData.flatMap{ uow_id =>
+ rc.journal_append_latency = client.metric_journal_append
+ rc.index_update_latency = client.metric_index_update
- val uow = delayedUOWs.remove(uow_id)
- // Message may be flushed or canceled before the timeout flush event..
- // uow may be null in those cases
- if (uow!=null) {
- uow.flushing = true
- Some(uow)
- } else {
- None
- }
- }
+ rc.canceled_message_counter = metric_canceled_message_counter
+ rc.canceled_enqueue_counter = metric_canceled_enqueue_counter
+ rc.flushed_message_counter = metric_flushed_message_counter
+ rc.flushed_enqueue_counter = metric_flushed_enqueue_counter
- if( !uows.isEmpty ) {
- flush_latency_counter.start { end=>
- executor_pool {
- client.store(uows, ^{
- dispatchQueue {
-
- end()
- uows.foreach { uow=>
-
- uow.actions.foreach { case (msg, action) =>
- if( action.messageRecord !=null ) {
- metric_flushed_message_counter += 1
- pendingStores.remove(msg)
- }
- action.enqueues.foreach { queueEntry=>
- metric_flushed_enqueue_counter += 1
- val k = key(queueEntry)
- pending_enqueues.remove(k)
- }
- }
- uow.onPerformed
-
- }
- }
- })
- }
- }
- }
+ callback(rc)
}
-
}
Added: activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala?rev=980772&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala Fri Jul 30 13:00:33 2010
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.store
+
+import collection.mutable.ListBuffer
+import java.util.HashMap
+import collection.Seq
+import org.fusesource.hawtdispatch.ScalaDispatch._
+import java.util.concurrent._
+import org.apache.activemq.apollo.util._
+import org.fusesource.hawtdispatch.{BaseRetained, ListEventAggregator}
+
+/**
+ * <p>
+ * Support class for implementing Stores which delay doing updates
+ * so that it can support potentially be canceling the update due
+ * to subsequent operation.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait DelayingStoreSupport extends Store with BaseService {
+
+ protected def flush_delay:Long
+
+ protected def get_next_msg_key:Long
+
+ protected def store(uows: Seq[DelayableUOW])(callback: =>Unit):Unit
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Implementation of the BaseService interface
+ //
+ /////////////////////////////////////////////////////////////////////
+ val dispatchQueue = createQueue(toString)
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Implementation of the StoreBatch interface
+ //
+ /////////////////////////////////////////////////////////////////////
+ def createStoreUOW() = new DelayableUOW
+
+ class DelayableUOW extends BaseRetained with StoreUOW {
+
+ var dispose_start:Long = 0
+ var flushing = false;
+
+ class MessageAction {
+
+ var msg= 0L
+ var messageRecord: MessageRecord = null
+ var enqueues = ListBuffer[QueueEntryRecord]()
+ var dequeues = ListBuffer[QueueEntryRecord]()
+
+ def uow = DelayableUOW.this
+ def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
+
+ def cancel() = {
+ uow.rm(msg)
+ }
+ }
+
+ val uow_id:Int = next_batch_id.getAndIncrement
+ var actions = Map[Long, MessageAction]()
+
+ var completeListeners = ListBuffer[Runnable]()
+ var disableDelay = false
+
+ def onComplete(callback: Runnable) = if( callback!=null ) { this.synchronized { completeListeners += callback } }
+
+ def completeASAP() = this.synchronized { disableDelay=true }
+
+ var delayable_actions = 0
+
+ def delayable = !disableDelay && delayable_actions>0 && flush_delay>=0
+
+ def rm(msg:Long) = {
+ actions -= msg
+ if( actions.isEmpty ) {
+ cancel
+ }
+ }
+
+ def cancel = {
+ delayedUOWs.remove(uow_id)
+ onPerformed
+ }
+
+ def store(record: MessageRecord):Long = {
+ record.key = get_next_msg_key
+ val action = new MessageAction
+ action.msg = record.key
+ action.messageRecord = record
+ this.synchronized {
+ actions += record.key -> action
+ }
+ dispatchQueue {
+ pendingStores.put(record.key, action)
+ }
+ delayable_actions += 1
+ record.key
+ }
+
+ def action(msg:Long) = {
+ actions.get(msg) match {
+ case Some(x) => x
+ case None =>
+ val x = new MessageAction
+ x.msg = msg
+ actions += msg->x
+ x
+ }
+ }
+
+ def enqueue(entry: QueueEntryRecord) = {
+ val a = this.synchronized {
+ val a = action(entry.messageKey)
+ a.enqueues += entry
+ delayable_actions += 1
+ a
+ }
+ dispatchQueue {
+ pending_enqueues.put(key(entry), a)
+ }
+
+ }
+
+ def dequeue(entry: QueueEntryRecord) = {
+ this.synchronized {
+ action(entry.messageKey).dequeues += entry
+ }
+ }
+
+ override def dispose = {
+ dispose_start = System.nanoTime
+ uow_source.merge(this)
+ }
+
+ def onPerformed() = this.synchronized {
+ commit_latency_counter += System.nanoTime-dispose_start
+ completeListeners.foreach { x=>
+ x.run
+ }
+ super.dispose
+ }
+ }
+
+
+ def flushMessage(messageKey: Long)(cb: => Unit) = dispatchQueue {
+ val action: DelayableUOW#MessageAction = pendingStores.get(messageKey)
+ if( action == null ) {
+ cb
+ } else {
+ action.uow.onComplete(^{ cb })
+ flush(action.uow.uow_id)
+ }
+ }
+
+
+ var metric_canceled_message_counter:Long = 0
+ var metric_canceled_enqueue_counter:Long = 0
+ var metric_flushed_message_counter:Long = 0
+ var metric_flushed_enqueue_counter:Long = 0
+
+ val commit_latency_counter = new TimeCounter
+ var commit_latency = commit_latency_counter(false)
+
+ val message_load_latency_counter = new TimeCounter
+ var message_load_latency = message_load_latency_counter(false)
+
+ val message_load_batch_size_counter = new IntMetricCounter
+ var message_load_batch_size = message_load_batch_size_counter(false)
+
+ var canceled_add_message:Long = 0
+ var canceled_enqueue:Long = 0
+
+
+ def key(x:QueueEntryRecord) = (x.queueKey, x.queueSeq)
+
+ val uow_source = createSource(new ListEventAggregator[DelayableUOW](), dispatchQueue)
+ uow_source.setEventHandler(^{drain_uows});
+ uow_source.resume
+
+ var pendingStores = new HashMap[Long, DelayableUOW#MessageAction]()
+ var pending_enqueues = new HashMap[(Long,Long), DelayableUOW#MessageAction]()
+ var delayedUOWs = new HashMap[Int, DelayableUOW]()
+
+ var next_batch_id = new IntCounter(1)
+
+ def drain_uows = {
+ uow_source.getData.foreach { uow =>
+
+ delayedUOWs.put(uow.uow_id, uow)
+
+ uow.actions.foreach { case (msg, action) =>
+
+ // dequeues can cancel out previous enqueues
+ action.dequeues.foreach { currentDequeue=>
+ val currentKey = key(currentDequeue)
+ val prev_action:DelayableUOW#MessageAction = pending_enqueues.remove(currentKey)
+
+ def prev_batch = prev_action.uow
+
+ if( prev_action!=null && !prev_batch.flushing ) {
+
+
+ prev_batch.delayable_actions -= 1
+ metric_canceled_enqueue_counter += 1
+
+ // yay we can cancel out a previous enqueue
+ prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) == currentKey )
+
+ // if the message is not in any queues.. we can gc it..
+ if( prev_action.enqueues == Nil && prev_action.messageRecord !=null ) {
+ pendingStores.remove(msg)
+ prev_action.messageRecord = null
+ prev_batch.delayable_actions -= 1
+ metric_canceled_message_counter += 1
+ }
+
+ // Cancel the action if it's now empty
+ if( prev_action.isEmpty ) {
+ prev_action.cancel()
+ } else if( !prev_batch.delayable ) {
+ // flush it if there is no point in delyaing anymore
+ flush(prev_batch.uow_id)
+ }
+
+ // since we canceled out the previous enqueue.. now cancel out the action
+ action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
+ if( action.isEmpty ) {
+ action.cancel()
+ }
+ }
+ }
+ }
+
+ val batch_id = uow.uow_id
+ if( uow.delayable ) {
+ dispatchQueue.dispatchAfter(flush_delay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
+ } else {
+ flush(batch_id)
+ }
+
+ }
+ }
+
+ private def flush(batch_id:Int) = {
+ flush_source.merge(batch_id)
+ }
+
+ val flush_source = createSource(new ListEventAggregator[Int](), dispatchQueue)
+ flush_source.setEventHandler(^{drain_flushes});
+ flush_source.resume
+
+ val flush_latency_counter = new TimeCounter
+ var flush_latency = flush_latency_counter(false)
+
+ def drain_flushes:Unit = {
+
+ if( !serviceState.isStarted ) {
+ return
+ }
+
+ val uows = flush_source.getData.flatMap{ uow_id =>
+
+ val uow = delayedUOWs.remove(uow_id)
+ // Message may be flushed or canceled before the timeout flush event..
+ // uow may be null in those cases
+ if (uow!=null) {
+ uow.flushing = true
+ Some(uow)
+ } else {
+ None
+ }
+ }
+
+ if( !uows.isEmpty ) {
+ flush_latency_counter.start { end=>
+ store(uows) {
+ end()
+ uows.foreach { uow=>
+
+ uow.actions.foreach { case (msg, action) =>
+ if( action.messageRecord !=null ) {
+ metric_flushed_message_counter += 1
+ pendingStores.remove(msg)
+ }
+ action.enqueues.foreach { queueEntry=>
+ metric_flushed_enqueue_counter += 1
+ val k = key(queueEntry)
+ pending_enqueues.remove(k)
+ }
+ }
+ uow.onPerformed
+
+ }
+ }
+ }
+ }
+ }
+
+}