You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/09 03:13:36 UTC

svn commit: r1133624 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-cassandra/src/main/scala/org/apache/activemq/...

Author: chirino
Date: Thu Jun  9 01:13:35 2011
New Revision: 1133624

URL: http://svn.apache.org/viewvc?rev=1133624&view=rev
Log:
Tightening up the DelayingStoreSupport.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1133624&r1=1133623&r2=1133624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Thu Jun  9 01:13:35 2011
@@ -320,12 +320,12 @@ class BDBClient(store: BDBStore) {
           uow.actions.foreach {
             case (msg, action) =>
 
-              val message_record = action.messageRecord
+              val message_record = action.message_record
               if (message_record != null) {
                 import PBSupport._
 
                 val pb = if( message_record.zero_copy_buffer != null ) {
-                  val r = to_pb(action.messageRecord).copy
+                  val r = to_pb(action.message_record).copy
                   val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
                   r.setZcpFile(buffer.file)
                   r.setZcpOffset(buffer.offset)
@@ -334,10 +334,10 @@ class BDBClient(store: BDBStore) {
                   zcp_files_to_sync += buffer.file
                   r.freeze
                 } else {
-                  to_pb(action.messageRecord)
+                  to_pb(action.message_record)
                 }
 
-                messages_db.put(tx, action.messageRecord.key, pb)
+                messages_db.put(tx, action.message_record.key, pb)
               }
 
               action.enqueues.foreach { queueEntry =>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1133624&r1=1133623&r2=1133624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Thu Jun  9 01:13:35 2011
@@ -21,6 +21,7 @@ import java.util.HashMap
 import collection.Seq
 import org.fusesource.hawtdispatch._
 import java.util.concurrent._
+import atomic.AtomicInteger
 import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.{BaseRetained, ListEventAggregator}
 import org.apache.activemq.apollo.dto.{StoreStatusDTO, TimeMetricDTO, IntMetricDTO}
@@ -61,16 +62,17 @@ trait DelayingStoreSupport extends Store
 
     var dispose_start:Long = 0
     var flushing = false;
+    var status = "init"
 
     class MessageAction {
 
       var msg= 0L
-      var messageRecord: MessageRecord = null
+      var message_record: MessageRecord = null
       var enqueues = ListBuffer[QueueEntryRecord]()
       var dequeues = ListBuffer[QueueEntryRecord]()
 
       def uow = DelayableUOW.this
-      def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
+      def isEmpty() = message_record==null && enqueues==Nil && dequeues==Nil
 
       def cancel() = {
         uow.rm(msg)
@@ -81,15 +83,15 @@ trait DelayingStoreSupport extends Store
     var actions = Map[Long, MessageAction]()
 
     var completed = false
-    var completeListeners = ListBuffer[() => Unit]()
-    var disableDelay = false
+    var complete_listeners = ListBuffer[() => Unit]()
+    var disable_delay = false
 
     def on_complete(callback: =>Unit) = {
       if( this.synchronized {
         if( completed ) {
           true
         } else {
-          completeListeners += ( ()=> callback  )
+          complete_listeners += ( ()=> callback  )
           false
         }
       }) {
@@ -97,11 +99,11 @@ trait DelayingStoreSupport extends Store
       }
     }
 
-    def complete_asap() = this.synchronized { disableDelay=true }
+    def complete_asap() = this.synchronized { disable_delay=true }
 
     var delayable_actions = 0
 
-    def delayable = !disableDelay && delayable_actions>0 && flush_delay>=0
+    def delayable = !disable_delay && delayable_actions>0 && flush_delay>=0
 
     def rm(msg:Long) = {
       actions -= msg
@@ -111,20 +113,22 @@ trait DelayingStoreSupport extends Store
     }
 
     def cancel = {
-      delayedUOWs.remove(uow_id)
-      onPerformed
+      status += "|cancel"
+      flushing = true
+      delayed_uows.remove(uow_id)
+      on_performed
     }
 
     def store(record: MessageRecord):Long = {
       record.key = get_next_msg_key
       val action = new MessageAction
       action.msg = record.key
-      action.messageRecord = record
+      action.message_record = record
       this.synchronized {
         actions += record.key -> action
       }
       aggregator {
-        pendingStores.put(record.key, action)
+        pending_stores.put(record.key, action)
       }
       delayable_actions += 1
       record.key
@@ -161,31 +165,37 @@ trait DelayingStoreSupport extends Store
     }
 
     override def dispose = {
+      status += "|commited"
       dispose_start = System.nanoTime
       uow_source.merge(this)
     }
 
-    def onPerformed() = this.synchronized {
+    def on_performed() = this.synchronized {
+      status += "|performed"
       commit_latency_counter += System.nanoTime-dispose_start
-      completeListeners.foreach(_())
+      complete_listeners.foreach(_())
       super.dispose
     }
   }
 
+  def flush_message(message_key: Long)(cb: => Unit) = flush_message_source.merge((message_key, cb _))
 
-  def flush_message(messageKey: Long)(cb: => Unit) = dispatch_queue {
-    val action: DelayableUOW#MessageAction = pendingStores.get(messageKey)
-    if( action == null ) {
-      cb
-    } else {
-      // TODO: protect against this causing a 2nd flush.
-      delayedUOWs.put(action.uow.uow_id, action.uow)
-      action.uow.on_complete( cb )
-      flush(action.uow.uow_id)
+  val flush_message_source = createSource(new ListEventAggregator[(Long, ()=>Unit)](), dispatch_queue)
+  flush_message_source.setEventHandler(^{drain_flush_message});
+  flush_message_source.resume
+  
+  def drain_flush_message:Unit = {
+    flush_message_source.getData.foreach { case (message_key, cb) =>
+      pending_stores.get(message_key) match {
+        case null => cb()
+        case action =>
+          action.uow.status += "|flush_message"
+          action.uow.on_complete( cb() )
+          flush(action.uow)
+      }
     }
   }
 
-
   implicit def toTimeMetricDTO( m: TimeMetric) = {
     val rc = new TimeMetricDTO()
     rc.count = m.count
@@ -235,7 +245,21 @@ trait DelayingStoreSupport extends Store
     rc.canceled_enqueue_counter = metric_canceled_enqueue_counter
     rc.flushed_message_counter = metric_flushed_message_counter
     rc.flushed_enqueue_counter = metric_flushed_enqueue_counter
-    rc.pending_stores = pendingStores.size
+    rc.pending_stores = pending_stores.size
+
+//    import collection.JavaConversions._
+//    var last = ""
+//    var count = 0
+//    pending_stores.valuesIterator.map(_.uow.status).foreach{ line =>
+//      if( last!= "" && last!=line) {
+//        println(last+" occured "+count+" times")
+//        count = 0
+//      }
+//      count += 1
+//      last = line
+//    }
+//    println(last+" occured "+count+" times")
+//    println("--------------")
   }
 
   def key(x:QueueEntryRecord) = (x.queue_key, x.entry_seq)
@@ -244,17 +268,18 @@ trait DelayingStoreSupport extends Store
   uow_source.setEventHandler(^{drain_uows});
   uow_source.resume
 
-  var pendingStores = new HashMap[Long, DelayableUOW#MessageAction]()
+  var pending_stores = new HashMap[Long, DelayableUOW#MessageAction]()
   var pending_enqueues = new HashMap[(Long,Long), DelayableUOW#MessageAction]()
-  var delayedUOWs = new HashMap[Int, DelayableUOW]()
+  var delayed_uows = new HashMap[Int, DelayableUOW]()
 
-  var next_batch_id = new IntCounter(1)
+  val next_batch_id = new AtomicInteger(1)
 
   def drain_uows = {
     dispatch_queue.assertExecuting()
     uow_source.getData.foreach { uow =>
 
-      delayedUOWs.put(uow.uow_id, uow)
+      uow.status += "|delayed"
+      delayed_uows.put(uow.uow_id, uow)
 
       uow.actions.foreach { case (msg, action) =>
 
@@ -263,31 +288,31 @@ trait DelayingStoreSupport extends Store
           val currentKey = key(currentDequeue)
           val prev_action:DelayableUOW#MessageAction = pending_enqueues.remove(currentKey)
 
-          def prev_batch = prev_action.uow
+          def prev_uow = prev_action.uow
 
-          if( prev_action!=null && !prev_batch.flushing ) {
+          if( prev_action!=null && !prev_uow.flushing ) {
 
 
-            prev_batch.delayable_actions -= 1
+            prev_uow.delayable_actions -= 1
             metric_canceled_enqueue_counter += 1
 
             // yay we can cancel out a previous enqueue
             prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) == currentKey )
 
             // if the message is not in any queues.. we can gc it..
-            if( prev_action.enqueues == Nil && prev_action.messageRecord !=null ) {
-              pendingStores.remove(msg)
-              prev_action.messageRecord = null
-              prev_batch.delayable_actions -= 1
+            if( prev_action.enqueues == Nil && prev_action.message_record !=null ) {
+              pending_stores.remove(msg)
+              prev_action.message_record = null
+              prev_uow.delayable_actions -= 1
               metric_canceled_message_counter += 1
             }
 
             // Cancel the action if it's now empty
             if( prev_action.isEmpty ) {
               prev_action.cancel()
-            } else if( !prev_batch.delayable ) {
+            } else if( !prev_uow.delayable ) {
               // flush it if there is no point in delyaing anymore
-              flush(prev_batch.uow_id)
+              flush(prev_uow)
             }
 
             // since we canceled out the previous enqueue.. now cancel out the action
@@ -299,21 +324,28 @@ trait DelayingStoreSupport extends Store
         }
       }
 
-      val batch_id = uow.uow_id
+      val uow_id = uow.uow_id
       if( uow.delayable ) {
-        dispatch_queue.executeAfter(flush_delay, TimeUnit.MILLISECONDS, ^{flush(batch_id)})
+        dispatch_queue.executeAfter(flush_delay, TimeUnit.MILLISECONDS, ^{
+          flush(delayed_uows.get(uow_id))
+        })
       } else {
-        flush(batch_id)
+        flush(uow)
       }
 
     }
   }
 
-  private def flush(batch_id:Int) = {
-    flush_source.merge(batch_id)
+  private def flush(uow:DelayableUOW) = {
+    if( uow!=null && !uow.flushing ) {
+      uow.flushing = true
+      delayed_uows.remove(uow.uow_id)
+      uow.status += "|flushing"
+      flush_source.merge(uow)
+    }
   }
 
-  val flush_source = createSource(new ListEventAggregator[Int](), dispatch_queue)
+  val flush_source = createSource(new ListEventAggregator[DelayableUOW](), dispatch_queue)
   flush_source.setEventHandler(^{drain_flushes});
   flush_source.resume
 
@@ -327,19 +359,7 @@ trait DelayingStoreSupport extends Store
       return
     }
 
-    val uows = flush_source.getData.flatMap{ uow_id =>
-
-      val uow = delayedUOWs.remove(uow_id)
-      // Message may be flushed or canceled before the timeout flush event..
-      // uow may be null in those cases
-      if (uow!=null) {
-        uow.flushing = true
-        Some(uow)
-      } else {
-        None
-      }
-    }
-
+    val uows = flush_source.getData
     if( !uows.isEmpty ) {
       flush_latency_counter.start { end=>
         flush_source.suspend
@@ -348,18 +368,19 @@ trait DelayingStoreSupport extends Store
           flush_source.resume
           end()
           uows.foreach { uow=>
+            uow.status += "|flushed"
             uow.actions.foreach { case (msg, action) =>
-              if( action.messageRecord !=null ) {
+              if( action.message_record !=null ) {
                 metric_flushed_message_counter += 1
-                pendingStores.remove(msg)
+                pending_stores.remove(msg)
               }
-              action.enqueues.foreach { queueEntry=>
+              action.enqueues.foreach { queue_entry=>
                 metric_flushed_enqueue_counter += 1
-                val k = key(queueEntry)
+                val k = key(queue_entry)
                 pending_enqueues.remove(k)
               }
             }
-            uow.onPerformed
+            uow.on_performed
 
           }
         }

Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala?rev=1133624&r1=1133623&r2=1133624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala Thu Jun  9 01:13:35 2011
@@ -116,8 +116,8 @@ class CassandraClient() {
             tx.actions.foreach {
               case (msg, action) =>
                 var rc =
-                if (action.messageRecord != null) {
-                  operations ::= Insert( schema.message_data \ (msg, action.messageRecord ) )
+                if (action.message_record != null) {
+                  operations ::= Insert( schema.message_data \ (msg, action.message_record ) )
                 }
                 action.enqueues.foreach {
                   queueEntry =>

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1133624&r1=1133623&r2=1133624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala Thu Jun  9 01:13:35 2011
@@ -234,8 +234,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       tx =>
         tx.actions.foreach {
           case (msg, action) =>
-            if (action.messageRecord != null) {
-              val update: AddMessage.Bean = action.messageRecord
+            if (action.message_record != null) {
+              val update: AddMessage.Bean = action.message_record
               batch += update
             }
             action.enqueues.foreach {

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1133624&r1=1133623&r2=1133624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Thu Jun  9 01:13:35 2011
@@ -355,11 +355,11 @@ class JDBM2Client(store: JDBM2Store) {
       uows.foreach { uow =>
         uow.actions.foreach { case (msg, action) =>
 
-          val message_record = action.messageRecord
+          val message_record = action.message_record
           if (message_record != null) {
 
             val pb = if( message_record.zero_copy_buffer != null ) {
-              val r = to_pb(action.messageRecord).copy
+              val r = to_pb(action.message_record).copy
               val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
               r.setZcpFile(buffer.file)
               r.setZcpOffset(buffer.offset)
@@ -368,12 +368,12 @@ class JDBM2Client(store: JDBM2Store) {
               zcp_files_to_sync += buffer.file
               r.freeze
             } else {
-              to_pb(action.messageRecord)
+              to_pb(action.message_record)
             }
 
-            messages_db.put(action.messageRecord.key, pb)
-            if( action.messageRecord.key > last_message_key ) {
-              last_message_key = action.messageRecord.key
+            messages_db.put(action.message_record.key, pb)
+            if( action.message_record.key > last_message_key ) {
+              last_message_key = action.message_record.key
               recman.setNamedObject("last_message_key", last_message_key)
             }
           }