You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:25 UTC
svn commit: r961152 - in /activemq/sandbox/activemq-apollo-actor:
activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
Author: chirino
Date: Wed Jul 7 04:11:25 2010
New Revision: 961152
URL: http://svn.apache.org/viewvc?rev=961152&view=rev
Log:
Track the in flight message stores and enqueue better so that we don't erroniously report them being flushed before they are actually fully flushed and indexed.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961152&r1=961151&r2=961152&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul 7 04:11:25 2010
@@ -241,6 +241,7 @@ class CassandraStore extends Store with
val txid:Int = next_tx_id.getAndIncrement
var actions = Map[Long, MessageAction]()
+ var flushing= false
var flushListeners = ListBuffer[Runnable]()
def eagerFlush(callback: Runnable) = if( callback!=null ) { this.synchronized { flushListeners += callback } }
@@ -263,6 +264,9 @@ class CassandraStore extends Store with
this.synchronized {
actions += record.key -> action
}
+ dispatchQueue {
+ pendingStores.put(record.key, action)
+ }
record.key
}
@@ -279,7 +283,11 @@ class CassandraStore extends Store with
def enqueue(entry: QueueEntryRecord) = {
this.synchronized {
- action(entry.messageKey).enqueues += entry
+ val a = action(entry.messageKey)
+ a.enqueues += entry
+ dispatchQueue {
+ pendingEnqueues.put(key(entry), a)
+ }
}
}
@@ -321,19 +329,12 @@ class CassandraStore extends Store with
delayedTransactions.put(tx_id, tx)
tx.actions.foreach { case (msg, action) =>
- if( action.store!=null ) {
- pendingStores.put(msg, action)
- }
- action.enqueues.foreach { queueEntry=>
- pendingEnqueues.put(key(queueEntry), action)
- }
-
// dequeues can cancel out previous enqueues
action.dequeues.foreach { currentDequeue=>
val currentKey = key(currentDequeue)
val prevAction:CassandraBatch#MessageAction = pendingEnqueues.remove(currentKey)
- if( prevAction!=null ) {
+ if( prevAction!=null && !prevAction.tx.flushing ) {
// yay we can cancel out a previous enqueue
prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
@@ -386,17 +387,7 @@ class CassandraStore extends Store with
// Message may be flushed or canceled before the timeout flush event..
// tx may be null in those cases
if (tx!=null) {
-
- tx.actions.foreach { case (msg, action) =>
- if( action.store!=null ) {
- pendingStores.remove(msg)
- }
- action.enqueues.foreach { queueEntry=>
- val k = key(queueEntry)
- pendingEnqueues.remove(k)
- }
- }
-
+ tx.flushing = true
Some(tx)
} else {
None
@@ -409,8 +400,19 @@ class CassandraStore extends Store with
client.store(txs)
dispatchQueue {
end()
- txs.foreach { x=>
- x.onPerformed
+ txs.foreach { tx=>
+
+ tx.actions.foreach { case (msg, action) =>
+ if( action.store!=null ) {
+ pendingStores.remove(msg)
+ }
+ action.enqueues.foreach { queueEntry=>
+ val k = key(queueEntry)
+ pendingEnqueues.remove(k)
+ }
+ }
+
+ tx.onPerformed
}
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961152&r1=961151&r2=961152&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:11:25 2010
@@ -173,19 +173,16 @@ class HawtDBStore extends Store with Bas
}
}
- private def doFlush(id: Long, callback:Runnable) = {
+ def flushMessage(id: Long)(cb: => Unit) = dispatchQueue {
val action: HawtDBBatch#MessageAction = pendingStores.get(id)
if( action == null ) {
- callback.run
+// println("flush due to not found: "+id)
+ cb
} else {
- action.tx.eagerFlush(callback)
+ action.tx.eagerFlush(^{ cb })
flush(action.tx.txid)
}
}
-
- def flushMessage(id: Long)(callback: => Unit) = dispatchQueue {
- doFlush(id, ^{ callback } )
- }
def createStoreBatch() = new HawtDBBatch
@@ -198,6 +195,7 @@ class HawtDBStore extends Store with Bas
class HawtDBBatch extends BaseRetained with StoreBatch {
var dispose_start:Long = 0
+ var flushing = false;
class MessageAction {
@@ -240,6 +238,10 @@ class HawtDBStore extends Store with Bas
this.synchronized {
actions += record.key -> action
}
+ dispatchQueue {
+ pendingStores.put(record.key, action)
+ }
+
record.key
}
@@ -255,9 +257,15 @@ class HawtDBStore extends Store with Bas
}
def enqueue(entry: QueueEntryRecord) = {
+
this.synchronized {
- action(entry.messageKey).enqueues += entry
+ val a = action(entry.messageKey)
+ a.enqueues += entry
+ dispatchQueue {
+ pendingEnqueues.put(key(entry), a)
+ }
}
+
}
def dequeue(entry: QueueEntryRecord) = {
@@ -355,19 +363,12 @@ class HawtDBStore extends Store with Bas
delayedTransactions.put(tx.txid, tx)
tx.actions.foreach { case (msg, action) =>
- if( action.messageRecord!=null ) {
- pendingStores.put(msg, action)
- }
- action.enqueues.foreach { queueEntry=>
- pendingEnqueues.put(key(queueEntry), action)
- }
-
// dequeues can cancel out previous enqueues
action.dequeues.foreach { currentDequeue=>
val currentKey = key(currentDequeue)
val prevAction:HawtDBBatch#MessageAction = pendingEnqueues.remove(currentKey)
- if( prevAction!=null ) {
+ if( prevAction!=null && !prevAction.tx.flushing ) {
metric_canceled_enqueue_counter += 1
@@ -422,22 +423,12 @@ class HawtDBStore extends Store with Bas
}
val txs = flush_source.getData.flatMap{ tx_id =>
+
val tx = delayedTransactions.remove(tx_id)
// Message may be flushed or canceled before the timeout flush event..
// tx may be null in those cases
if (tx!=null) {
- tx.actions.foreach { case (msg, action) =>
- if( action.messageRecord !=null ) {
- metric_flushed_message_counter += 1
- pendingStores.remove(msg)
- }
- action.enqueues.foreach { queueEntry=>
- metric_flushed_enqueue_counter += 1
- val k = key(queueEntry)
- pendingEnqueues.remove(k)
- }
- }
-
+ tx.flushing = true
Some(tx)
} else {
None
@@ -448,9 +439,23 @@ class HawtDBStore extends Store with Bas
storeLatency.start { end=>
client.store(txs, ^{
dispatchQueue {
+
end()
txs.foreach { tx=>
+
+ tx.actions.foreach { case (msg, action) =>
+ if( action.messageRecord !=null ) {
+ metric_flushed_message_counter += 1
+ pendingStores.remove(msg)
+ }
+ action.enqueues.foreach { queueEntry=>
+ metric_flushed_enqueue_counter += 1
+ val k = key(queueEntry)
+ pendingEnqueues.remove(k)
+ }
+ }
tx.onPerformed
+
}
}
})