You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:25 UTC

svn commit: r961152 - in /activemq/sandbox/activemq-apollo-actor: activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala

Author: chirino
Date: Wed Jul  7 04:11:25 2010
New Revision: 961152

URL: http://svn.apache.org/viewvc?rev=961152&view=rev
Log:
Track the in flight message stores and enqueue better so that we don't erroniously report them being flushed before they are actually fully flushed and indexed.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala?rev=961152&r1=961151&r2=961152&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraStore.scala Wed Jul  7 04:11:25 2010
@@ -241,6 +241,7 @@ class CassandraStore extends Store with 
 
     val txid:Int = next_tx_id.getAndIncrement
     var actions = Map[Long, MessageAction]()
+    var flushing= false
 
     var flushListeners = ListBuffer[Runnable]()
     def eagerFlush(callback: Runnable) = if( callback!=null ) { this.synchronized { flushListeners += callback } }
@@ -263,6 +264,9 @@ class CassandraStore extends Store with 
       this.synchronized {
         actions += record.key -> action
       }
+      dispatchQueue {
+        pendingStores.put(record.key, action)
+      }
       record.key
     }
 
@@ -279,7 +283,11 @@ class CassandraStore extends Store with 
 
     def enqueue(entry: QueueEntryRecord) = {
       this.synchronized {
-        action(entry.messageKey).enqueues += entry
+        val a = action(entry.messageKey)
+        a.enqueues += entry
+        dispatchQueue {
+          pendingEnqueues.put(key(entry), a)
+        }
       }
     }
 
@@ -321,19 +329,12 @@ class CassandraStore extends Store with 
       delayedTransactions.put(tx_id, tx)
 
       tx.actions.foreach { case (msg, action) =>
-        if( action.store!=null ) {
-          pendingStores.put(msg, action)
-        }
-        action.enqueues.foreach { queueEntry=>
-          pendingEnqueues.put(key(queueEntry), action)
-        }
-
 
         // dequeues can cancel out previous enqueues
         action.dequeues.foreach { currentDequeue=>
           val currentKey = key(currentDequeue)
           val prevAction:CassandraBatch#MessageAction = pendingEnqueues.remove(currentKey)
-          if( prevAction!=null ) {
+          if( prevAction!=null && !prevAction.tx.flushing ) {
 
             // yay we can cancel out a previous enqueue
             prevAction.enqueues = prevAction.enqueues.filterNot( x=> key(x) == currentKey )
@@ -386,17 +387,7 @@ class CassandraStore extends Store with 
       // Message may be flushed or canceled before the timeout flush event..
       // tx may be null in those cases
       if (tx!=null) {
-
-        tx.actions.foreach { case (msg, action) =>
-          if( action.store!=null ) {
-            pendingStores.remove(msg)
-          }
-          action.enqueues.foreach { queueEntry=>
-            val k = key(queueEntry)
-            pendingEnqueues.remove(k)
-          }
-        }
-
+        tx.flushing = true
         Some(tx)
       } else {
         None
@@ -409,8 +400,19 @@ class CassandraStore extends Store with 
           client.store(txs)
           dispatchQueue {
             end()
-            txs.foreach { x=>
-              x.onPerformed
+            txs.foreach { tx=>
+
+              tx.actions.foreach { case (msg, action) =>
+                if( action.store!=null ) {
+                  pendingStores.remove(msg)
+                }
+                action.enqueues.foreach { queueEntry=>
+                  val k = key(queueEntry)
+                  pendingEnqueues.remove(k)
+                }
+              }
+
+              tx.onPerformed
             }
           }
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961152&r1=961151&r2=961152&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:11:25 2010
@@ -173,19 +173,16 @@ class HawtDBStore extends Store with Bas
     }
   }
 
-  private def doFlush(id: Long, callback:Runnable) = {
+  def flushMessage(id: Long)(cb: => Unit) = dispatchQueue {
     val action: HawtDBBatch#MessageAction = pendingStores.get(id)
     if( action == null ) {
-      callback.run
+//      println("flush due to not found: "+id)
+      cb
     } else {
-      action.tx.eagerFlush(callback)
+      action.tx.eagerFlush(^{ cb })
       flush(action.tx.txid)
     }
   }
-  
-  def flushMessage(id: Long)(callback: => Unit) = dispatchQueue {
-    doFlush(id, ^{ callback } )
-  }
 
   def createStoreBatch() = new HawtDBBatch
 
@@ -198,6 +195,7 @@ class HawtDBStore extends Store with Bas
   class HawtDBBatch extends BaseRetained with StoreBatch {
 
     var dispose_start:Long = 0
+    var flushing = false;
 
     class MessageAction {
 
@@ -240,6 +238,10 @@ class HawtDBStore extends Store with Bas
       this.synchronized {
         actions += record.key -> action
       }
+      dispatchQueue {
+        pendingStores.put(record.key, action)
+      }
+
       record.key
     }
 
@@ -255,9 +257,15 @@ class HawtDBStore extends Store with Bas
     }
 
     def enqueue(entry: QueueEntryRecord) = {
+
       this.synchronized {
-        action(entry.messageKey).enqueues += entry
+        val a = action(entry.messageKey)
+        a.enqueues += entry
+        dispatchQueue {
+          pendingEnqueues.put(key(entry), a)
+        }
       }
+
     }
 
     def dequeue(entry: QueueEntryRecord) = {
@@ -355,19 +363,12 @@ class HawtDBStore extends Store with Bas
       delayedTransactions.put(tx.txid, tx)
 
       tx.actions.foreach { case (msg, action) =>
-        if( action.messageRecord!=null ) {
-          pendingStores.put(msg, action)
-        }
-        action.enqueues.foreach { queueEntry=>
-          pendingEnqueues.put(key(queueEntry), action)
-        }
-
 
         // dequeues can cancel out previous enqueues
         action.dequeues.foreach { currentDequeue=>
           val currentKey = key(currentDequeue)
           val prevAction:HawtDBBatch#MessageAction = pendingEnqueues.remove(currentKey)
-          if( prevAction!=null ) {
+          if( prevAction!=null && !prevAction.tx.flushing ) {
 
             metric_canceled_enqueue_counter += 1
 
@@ -422,22 +423,12 @@ class HawtDBStore extends Store with Bas
     }
     
     val txs = flush_source.getData.flatMap{ tx_id =>
+
       val tx = delayedTransactions.remove(tx_id)
       // Message may be flushed or canceled before the timeout flush event..
       // tx may be null in those cases
       if (tx!=null) {
-        tx.actions.foreach { case (msg, action) =>
-          if( action.messageRecord !=null ) {
-            metric_flushed_message_counter += 1
-            pendingStores.remove(msg)
-          }
-          action.enqueues.foreach { queueEntry=>
-            metric_flushed_enqueue_counter += 1
-            val k = key(queueEntry)
-            pendingEnqueues.remove(k)
-          }
-        }
-
+        tx.flushing = true
         Some(tx)
       } else {
         None
@@ -448,9 +439,23 @@ class HawtDBStore extends Store with Bas
       storeLatency.start { end=>
         client.store(txs, ^{
           dispatchQueue {
+
             end()
             txs.foreach { tx=>
+
+              tx.actions.foreach { case (msg, action) =>
+                if( action.messageRecord !=null ) {
+                  metric_flushed_message_counter += 1
+                  pendingStores.remove(msg)
+                }
+                action.enqueues.foreach { queueEntry=>
+                  metric_flushed_enqueue_counter += 1
+                  val k = key(queueEntry)
+                  pendingEnqueues.remove(k)
+                }
+              }
               tx.onPerformed
+
             }
           }
         })