You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/09 03:13:36 UTC
svn commit: r1133624 - in /activemq/activemq-apollo/trunk:
apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/
apollo-cassandra/src/main/scala/org/apache/activemq/...
Author: chirino
Date: Thu Jun 9 01:13:35 2011
New Revision: 1133624
URL: http://svn.apache.org/viewvc?rev=1133624&view=rev
Log:
Tightening up the DelayingStoreSupport.
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1133624&r1=1133623&r2=1133624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Thu Jun 9 01:13:35 2011
@@ -320,12 +320,12 @@ class BDBClient(store: BDBStore) {
uow.actions.foreach {
case (msg, action) =>
- val message_record = action.messageRecord
+ val message_record = action.message_record
if (message_record != null) {
import PBSupport._
val pb = if( message_record.zero_copy_buffer != null ) {
- val r = to_pb(action.messageRecord).copy
+ val r = to_pb(action.message_record).copy
val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
r.setZcpFile(buffer.file)
r.setZcpOffset(buffer.offset)
@@ -334,10 +334,10 @@ class BDBClient(store: BDBStore) {
zcp_files_to_sync += buffer.file
r.freeze
} else {
- to_pb(action.messageRecord)
+ to_pb(action.message_record)
}
- messages_db.put(tx, action.messageRecord.key, pb)
+ messages_db.put(tx, action.message_record.key, pb)
}
action.enqueues.foreach { queueEntry =>
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1133624&r1=1133623&r2=1133624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Thu Jun 9 01:13:35 2011
@@ -21,6 +21,7 @@ import java.util.HashMap
import collection.Seq
import org.fusesource.hawtdispatch._
import java.util.concurrent._
+import atomic.AtomicInteger
import org.apache.activemq.apollo.util._
import org.fusesource.hawtdispatch.{BaseRetained, ListEventAggregator}
import org.apache.activemq.apollo.dto.{StoreStatusDTO, TimeMetricDTO, IntMetricDTO}
@@ -61,16 +62,17 @@ trait DelayingStoreSupport extends Store
var dispose_start:Long = 0
var flushing = false;
+ var status = "init"
class MessageAction {
var msg= 0L
- var messageRecord: MessageRecord = null
+ var message_record: MessageRecord = null
var enqueues = ListBuffer[QueueEntryRecord]()
var dequeues = ListBuffer[QueueEntryRecord]()
def uow = DelayableUOW.this
- def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
+ def isEmpty() = message_record==null && enqueues==Nil && dequeues==Nil
def cancel() = {
uow.rm(msg)
@@ -81,15 +83,15 @@ trait DelayingStoreSupport extends Store
var actions = Map[Long, MessageAction]()
var completed = false
- var completeListeners = ListBuffer[() => Unit]()
- var disableDelay = false
+ var complete_listeners = ListBuffer[() => Unit]()
+ var disable_delay = false
def on_complete(callback: =>Unit) = {
if( this.synchronized {
if( completed ) {
true
} else {
- completeListeners += ( ()=> callback )
+ complete_listeners += ( ()=> callback )
false
}
}) {
@@ -97,11 +99,11 @@ trait DelayingStoreSupport extends Store
}
}
- def complete_asap() = this.synchronized { disableDelay=true }
+ def complete_asap() = this.synchronized { disable_delay=true }
var delayable_actions = 0
- def delayable = !disableDelay && delayable_actions>0 && flush_delay>=0
+ def delayable = !disable_delay && delayable_actions>0 && flush_delay>=0
def rm(msg:Long) = {
actions -= msg
@@ -111,20 +113,22 @@ trait DelayingStoreSupport extends Store
}
def cancel = {
- delayedUOWs.remove(uow_id)
- onPerformed
+ status += "|cancel"
+ flushing = true
+ delayed_uows.remove(uow_id)
+ on_performed
}
def store(record: MessageRecord):Long = {
record.key = get_next_msg_key
val action = new MessageAction
action.msg = record.key
- action.messageRecord = record
+ action.message_record = record
this.synchronized {
actions += record.key -> action
}
aggregator {
- pendingStores.put(record.key, action)
+ pending_stores.put(record.key, action)
}
delayable_actions += 1
record.key
@@ -161,31 +165,37 @@ trait DelayingStoreSupport extends Store
}
override def dispose = {
+ status += "|commited"
dispose_start = System.nanoTime
uow_source.merge(this)
}
- def onPerformed() = this.synchronized {
+ def on_performed() = this.synchronized {
+ status += "|performed"
commit_latency_counter += System.nanoTime-dispose_start
- completeListeners.foreach(_())
+ complete_listeners.foreach(_())
super.dispose
}
}
+ def flush_message(message_key: Long)(cb: => Unit) = flush_message_source.merge((message_key, cb _))
- def flush_message(messageKey: Long)(cb: => Unit) = dispatch_queue {
- val action: DelayableUOW#MessageAction = pendingStores.get(messageKey)
- if( action == null ) {
- cb
- } else {
- // TODO: protect against this causing a 2nd flush.
- delayedUOWs.put(action.uow.uow_id, action.uow)
- action.uow.on_complete( cb )
- flush(action.uow.uow_id)
+ val flush_message_source = createSource(new ListEventAggregator[(Long, ()=>Unit)](), dispatch_queue)
+ flush_message_source.setEventHandler(^{drain_flush_message});
+ flush_message_source.resume
+
+ def drain_flush_message:Unit = {
+ flush_message_source.getData.foreach { case (message_key, cb) =>
+ pending_stores.get(message_key) match {
+ case null => cb()
+ case action =>
+ action.uow.status += "|flush_message"
+ action.uow.on_complete( cb() )
+ flush(action.uow)
+ }
}
}
-
implicit def toTimeMetricDTO( m: TimeMetric) = {
val rc = new TimeMetricDTO()
rc.count = m.count
@@ -235,7 +245,21 @@ trait DelayingStoreSupport extends Store
rc.canceled_enqueue_counter = metric_canceled_enqueue_counter
rc.flushed_message_counter = metric_flushed_message_counter
rc.flushed_enqueue_counter = metric_flushed_enqueue_counter
- rc.pending_stores = pendingStores.size
+ rc.pending_stores = pending_stores.size
+
+// import collection.JavaConversions._
+// var last = ""
+// var count = 0
+// pending_stores.valuesIterator.map(_.uow.status).foreach{ line =>
+// if( last!= "" && last!=line) {
+// println(last+" occured "+count+" times")
+// count = 0
+// }
+// count += 1
+// last = line
+// }
+// println(last+" occured "+count+" times")
+// println("--------------")
}
def key(x:QueueEntryRecord) = (x.queue_key, x.entry_seq)
@@ -244,17 +268,18 @@ trait DelayingStoreSupport extends Store
uow_source.setEventHandler(^{drain_uows});
uow_source.resume
- var pendingStores = new HashMap[Long, DelayableUOW#MessageAction]()
+ var pending_stores = new HashMap[Long, DelayableUOW#MessageAction]()
var pending_enqueues = new HashMap[(Long,Long), DelayableUOW#MessageAction]()
- var delayedUOWs = new HashMap[Int, DelayableUOW]()
+ var delayed_uows = new HashMap[Int, DelayableUOW]()
- var next_batch_id = new IntCounter(1)
+ val next_batch_id = new AtomicInteger(1)
def drain_uows = {
dispatch_queue.assertExecuting()
uow_source.getData.foreach { uow =>
- delayedUOWs.put(uow.uow_id, uow)
+ uow.status += "|delayed"
+ delayed_uows.put(uow.uow_id, uow)
uow.actions.foreach { case (msg, action) =>
@@ -263,31 +288,31 @@ trait DelayingStoreSupport extends Store
val currentKey = key(currentDequeue)
val prev_action:DelayableUOW#MessageAction = pending_enqueues.remove(currentKey)
- def prev_batch = prev_action.uow
+ def prev_uow = prev_action.uow
- if( prev_action!=null && !prev_batch.flushing ) {
+ if( prev_action!=null && !prev_uow.flushing ) {
- prev_batch.delayable_actions -= 1
+ prev_uow.delayable_actions -= 1
metric_canceled_enqueue_counter += 1
// yay we can cancel out a previous enqueue
prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) == currentKey )
// if the message is not in any queues.. we can gc it..
- if( prev_action.enqueues == Nil && prev_action.messageRecord !=null ) {
- pendingStores.remove(msg)
- prev_action.messageRecord = null
- prev_batch.delayable_actions -= 1
+ if( prev_action.enqueues == Nil && prev_action.message_record !=null ) {
+ pending_stores.remove(msg)
+ prev_action.message_record = null
+ prev_uow.delayable_actions -= 1
metric_canceled_message_counter += 1
}
// Cancel the action if it's now empty
if( prev_action.isEmpty ) {
prev_action.cancel()
- } else if( !prev_batch.delayable ) {
+ } else if( !prev_uow.delayable ) {
// flush it if there is no point in delyaing anymore
- flush(prev_batch.uow_id)
+ flush(prev_uow)
}
// since we canceled out the previous enqueue.. now cancel out the action
@@ -299,21 +324,28 @@ trait DelayingStoreSupport extends Store
}
}
- val batch_id = uow.uow_id
+ val uow_id = uow.uow_id
if( uow.delayable ) {
- dispatch_queue.executeAfter(flush_delay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
+ dispatch_queue.executeAfter(flush_delay, TimeUnit.MILLISECONDS, ^{
+ flush(delayed_uows.get(uow_id))
+ })
} else {
- flush(batch_id)
+ flush(uow)
}
}
}
- private def flush(batch_id:Int) = {
- flush_source.merge(batch_id)
+ private def flush(uow:DelayableUOW) = {
+ if( uow!=null && !uow.flushing ) {
+ uow.flushing = true
+ delayed_uows.remove(uow.uow_id)
+ uow.status += "|flushing"
+ flush_source.merge(uow)
+ }
}
- val flush_source = createSource(new ListEventAggregator[Int](), dispatch_queue)
+ val flush_source = createSource(new ListEventAggregator[DelayableUOW](), dispatch_queue)
flush_source.setEventHandler(^{drain_flushes});
flush_source.resume
@@ -327,19 +359,7 @@ trait DelayingStoreSupport extends Store
return
}
- val uows = flush_source.getData.flatMap{ uow_id =>
-
- val uow = delayedUOWs.remove(uow_id)
- // Message may be flushed or canceled before the timeout flush event..
- // uow may be null in those cases
- if (uow!=null) {
- uow.flushing = true
- Some(uow)
- } else {
- None
- }
- }
-
+ val uows = flush_source.getData
if( !uows.isEmpty ) {
flush_latency_counter.start { end=>
flush_source.suspend
@@ -348,18 +368,19 @@ trait DelayingStoreSupport extends Store
flush_source.resume
end()
uows.foreach { uow=>
+ uow.status += "|flushed"
uow.actions.foreach { case (msg, action) =>
- if( action.messageRecord !=null ) {
+ if( action.message_record !=null ) {
metric_flushed_message_counter += 1
- pendingStores.remove(msg)
+ pending_stores.remove(msg)
}
- action.enqueues.foreach { queueEntry=>
+ action.enqueues.foreach { queue_entry=>
metric_flushed_enqueue_counter += 1
- val k = key(queueEntry)
+ val k = key(queue_entry)
pending_enqueues.remove(k)
}
}
- uow.onPerformed
+ uow.on_performed
}
}
Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala?rev=1133624&r1=1133623&r2=1133624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala Thu Jun 9 01:13:35 2011
@@ -116,8 +116,8 @@ class CassandraClient() {
tx.actions.foreach {
case (msg, action) =>
var rc =
- if (action.messageRecord != null) {
- operations ::= Insert( schema.message_data \ (msg, action.messageRecord ) )
+ if (action.message_record != null) {
+ operations ::= Insert( schema.message_data \ (msg, action.message_record ) )
}
action.enqueues.foreach {
queueEntry =>
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1133624&r1=1133623&r2=1133624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala Thu Jun 9 01:13:35 2011
@@ -234,8 +234,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
tx =>
tx.actions.foreach {
case (msg, action) =>
- if (action.messageRecord != null) {
- val update: AddMessage.Bean = action.messageRecord
+ if (action.message_record != null) {
+ val update: AddMessage.Bean = action.message_record
batch += update
}
action.enqueues.foreach {
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1133624&r1=1133623&r2=1133624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Thu Jun 9 01:13:35 2011
@@ -355,11 +355,11 @@ class JDBM2Client(store: JDBM2Store) {
uows.foreach { uow =>
uow.actions.foreach { case (msg, action) =>
- val message_record = action.messageRecord
+ val message_record = action.message_record
if (message_record != null) {
val pb = if( message_record.zero_copy_buffer != null ) {
- val r = to_pb(action.messageRecord).copy
+ val r = to_pb(action.message_record).copy
val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
r.setZcpFile(buffer.file)
r.setZcpOffset(buffer.offset)
@@ -368,12 +368,12 @@ class JDBM2Client(store: JDBM2Store) {
zcp_files_to_sync += buffer.file
r.freeze
} else {
- to_pb(action.messageRecord)
+ to_pb(action.message_record)
}
- messages_db.put(action.messageRecord.key, pb)
- if( action.messageRecord.key > last_message_key ) {
- last_message_key = action.messageRecord.key
+ messages_db.put(action.message_record.key, pb)
+ if( action.message_record.key > last_message_key ) {
+ last_message_key = action.message_record.key
recman.setNamedObject("last_message_key", last_message_key)
}
}