You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/10 19:51:08 UTC

svn commit: r1156274 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/br...

Author: chirino
Date: Wed Aug 10 17:51:07 2011
New Revision: 1156274

URL: http://svn.apache.org/viewvc?rev=1156274&view=rev
Log:
Extend the Store interface so that the store can avoid disk syncs if they are not required.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1156274&r1=1156273&r2=1156274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Wed Aug 10 17:51:07 2011
@@ -24,12 +24,13 @@ import java.util.concurrent.atomic.Atomi
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
-import com.sleepycat.je._
 import java.io.{EOFException, InputStream, OutputStream}
 import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory}
 import org.apache.activemq.apollo.util.Log._
 import scala.Some
 import java.sql.ClientInfoStatus
+import com.sleepycat.je._
+import javax.management.remote.rmi._RMIConnection_Stub
 
 object BDBClient extends Log
 /**
@@ -84,7 +85,7 @@ class BDBClient(store: BDBStore) {
 
     environment = new Environment(directory, env_config);
 
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
       messages_db
       message_refs_db
@@ -183,7 +184,7 @@ class BDBClient(store: BDBStore) {
   }
 
 
-  def with_ctx[T](func: (TxContext) => T): T = {
+  def with_ctx[T](sync:Boolean=true)(func: (TxContext) => T): T = {
     var error:Throwable = null
     var rc:Option[T] = None
 
@@ -192,7 +193,12 @@ class BDBClient(store: BDBStore) {
     while(!rc.isDefined) {
 
 
-      val ctx = TxContext(environment.beginTransaction(null, null));
+      val ctx = if(sync) {
+        TxContext(environment.beginTransaction(null, null));
+      } else {
+        TxContext(environment.beginTransaction(null, new TransactionConfig().setDurability(Durability.COMMIT_NO_SYNC)))
+      }
+
       try {
         rc = Some(func(ctx))
       } catch {
@@ -222,7 +228,7 @@ class BDBClient(store: BDBStore) {
 
   def purge() = {
 
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
 
 
@@ -270,7 +276,7 @@ class BDBClient(store: BDBStore) {
   }
 
   def addQueue(record: QueueRecord, callback:Runnable) = {
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
       queues_db.put(tx, record.key, record)
     }
@@ -292,7 +298,7 @@ class BDBClient(store: BDBStore) {
   }
 
   def removeQueue(queue_key: Long, callback:Runnable) = {
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
 
       queues_db.delete(tx, queue_key)
@@ -313,7 +319,8 @@ class BDBClient(store: BDBStore) {
   }
 
   def store(uows: Seq[BDBStore#DelayableUOW], callback:Runnable) {
-    with_ctx { ctx=>
+    val sync = uows.find( ! _.complete_listeners.isEmpty ).isDefined
+    with_ctx(sync) { ctx=>
       import ctx._
       var zcp_files_to_sync = Set[Int]()
       uows.foreach { uow =>
@@ -360,7 +367,7 @@ class BDBClient(store: BDBStore) {
 
   def listQueues: Seq[Long] = {
     val rc = ListBuffer[Long]()
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
 
       queues_db.cursor(tx) { (key, _) =>
@@ -373,7 +380,7 @@ class BDBClient(store: BDBStore) {
   }
 
   def getQueue(queue_key: Long): Option[QueueRecord] = {
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
       queues_db.get(tx, to_database_entry(queue_key)).map( x=> to_queue_record(x)  )
     }
@@ -381,7 +388,7 @@ class BDBClient(store: BDBStore) {
 
   def listQueueEntryGroups(queue_key: Long, limit: Int) : Seq[QueueEntryRange] = {
     var rc = ListBuffer[QueueEntryRange]()
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
       var group:QueueEntryRange = null
 
@@ -429,7 +436,7 @@ class BDBClient(store: BDBStore) {
 
   def getQueueEntries(queue_key: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
     var rc = ListBuffer[QueueEntryRecord]()
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
       entries_db.cursor_from(tx, (queue_key, firstSeq)) { (key, value) =>
         val current_key:(Long,Long) = key
@@ -453,7 +460,7 @@ class BDBClient(store: BDBStore) {
 
   def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]):Unit = {
 
-    val missing = with_ctx { ctx=>
+    val missing = with_ctx() { ctx=>
       import ctx._
       requests.flatMap { x =>
         val (message_key, callback) = x
@@ -482,7 +489,7 @@ class BDBClient(store: BDBStore) {
 
     // There's a small chance that a message was missing, perhaps we started a read tx, before the
     // write tx completed.  Lets try again..
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
       missing.foreach { x =>
         val (message_key, callback) = x
@@ -504,7 +511,7 @@ class BDBClient(store: BDBStore) {
 
 
   def getLastMessageKey:Long = {
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
 
       messages_db.last_key(tx).map(to_long _).getOrElse(0)
@@ -512,7 +519,7 @@ class BDBClient(store: BDBStore) {
   }
 
   def getLastQueueKey:Long = {
-    with_ctx { ctx=>
+    with_ctx() { ctx=>
       import ctx._
 
       queues_db.last_key(tx).map(to_long _).getOrElse(0)
@@ -521,7 +528,7 @@ class BDBClient(store: BDBStore) {
 
   def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
     try {
-      with_ctx { ctx=>
+      with_ctx() { ctx=>
         import ctx._
         import PBSupport._
 
@@ -584,7 +591,7 @@ class BDBClient(store: BDBStore) {
         } while( !done )
       }
 
-      with_ctx { ctx=>
+      with_ctx() { ctx=>
         import ctx._
         import PBSupport._
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1156274&r1=1156273&r2=1156274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Aug 10 17:51:07 2011
@@ -1113,7 +1113,7 @@ class QueueEntry(val queue:Queue, val se
       if(!storing) {
         storing = true
         delivery.uow.enqueue(toQueueEntryRecord)
-        delivery.uow.on_complete {
+        delivery.uow.on_flush {
           queue.swap_out_completes_source.merge(this)
         }
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1156274&r1=1156273&r2=1156274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Wed Aug 10 17:51:07 2011
@@ -62,7 +62,6 @@ trait DelayingStoreSupport extends Store
 
     var dispose_start:Long = 0
     var flushing = false;
-    var status = "init"
 
     class MessageAction {
 
@@ -84,8 +83,23 @@ trait DelayingStoreSupport extends Store
 
     var completed = false
     var complete_listeners = ListBuffer[() => Unit]()
+    var flushed = false
+    var flush_listeners = ListBuffer[() => Unit]()
     var disable_delay = false
 
+    def on_flush(callback: =>Unit) = {
+      if( this.synchronized {
+        if( flushed ) {
+          true
+        } else {
+          flush_listeners += ( ()=> callback  )
+          false
+        }
+      }) {
+        callback
+      }
+    }
+
     def on_complete(callback: =>Unit) = {
       if( this.synchronized {
         if( completed ) {
@@ -113,10 +127,10 @@ trait DelayingStoreSupport extends Store
     }
 
     def cancel = {
-      status += "|cancel"
+      dispatch_queue.assertExecuting()
       flushing = true
       delayed_uows.remove(uow_id)
-      on_performed
+      on_completed
     }
 
     def store(record: MessageRecord):Long = {
@@ -165,16 +179,25 @@ trait DelayingStoreSupport extends Store
     }
 
     override def dispose = {
-      status += "|commited"
       dispose_start = System.nanoTime
       uow_source.merge(this)
     }
 
-    def on_performed() = this.synchronized {
-      status += "|performed"
-      commit_latency_counter += System.nanoTime-dispose_start
-      complete_listeners.foreach(_())
-      super.dispose
+    def on_flushed() = this.synchronized {
+      if( !flushed ) {
+        flushed = true
+        flush_listeners.foreach(_())
+      }
+    }
+
+    def on_completed() = this.synchronized {
+      if ( !completed ) {
+        on_flushed
+        completed = true
+        commit_latency_counter += System.nanoTime-dispose_start
+        complete_listeners.foreach(_())
+        super.dispose
+      }
     }
   }
 
@@ -189,7 +212,6 @@ trait DelayingStoreSupport extends Store
       pending_stores.get(message_key) match {
         case null => cb()
         case action =>
-          action.uow.status += "|flush_message"
           action.uow.on_complete( cb() )
           flush(action.uow)
       }
@@ -279,7 +301,6 @@ trait DelayingStoreSupport extends Store
     dispatch_queue.assertExecuting()
     uow_source.getData.foreach { uow =>
 
-      uow.status += "|delayed"
       delayed_uows.put(uow.uow_id, uow)
 
       uow.actions.foreach { case (msg, action) =>
@@ -341,7 +362,6 @@ trait DelayingStoreSupport extends Store
     if( uow!=null && !uow.flushing ) {
       uow.flushing = true
       delayed_uows.remove(uow.uow_id)
-      uow.status += "|flushing"
       flush_source.merge(uow)
     }
   }
@@ -365,11 +385,11 @@ trait DelayingStoreSupport extends Store
       flush_latency_counter.start { end=>
         flush_source.suspend
         store(uows) {
-          dispatch_queue.assertExecuting()
+          store_completed(uows)
+
           flush_source.resume
-          end()
+          dispatch_queue.assertExecuting()
           uows.foreach { uow=>
-            uow.status += "|flushed"
             uow.actions.foreach { case (msg, action) =>
               if( action.message_record !=null ) {
                 metric_flushed_message_counter += 1
@@ -377,16 +397,21 @@ trait DelayingStoreSupport extends Store
               }
               action.enqueues.foreach { queue_entry=>
                 metric_flushed_enqueue_counter += 1
-                val k = key(queue_entry)
-                pending_enqueues.remove(k)
+                pending_enqueues.remove(key(queue_entry))
               }
             }
-            uow.on_performed
-
           }
+          end()
         }
       }
     }
   }
 
+  def store_completed(uows: ListBuffer[DelayingStoreSupport.this.type#DelayableUOW]) {
+    uows.foreach { uow =>
+        uow.on_completed
+    }
+  }
+
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1156274&r1=1156273&r2=1156274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala Wed Aug 10 17:51:07 2011
@@ -67,7 +67,15 @@ trait StoreUOW extends Retained {
 
   /**
    * The specified callback is executed once the UOW
-   * is completed.
+   * has written to disk and flushed of the application
+   * buffers.
+   */
+  def on_flush(callback: =>Unit)
+
+  /**
+   * The specified callback is executed once the UOW
+   * has fully completed, that is it's been flushed and
+   * and synced to disk.
    */
   def on_complete(callback: =>Unit)