You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:00 UTC

svn commit: r961147 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/ activemq-store/src/test/scala/org/apache/activem...

Author: chirino
Date: Wed Jul  7 04:10:59 2010
New Revision: 961147

URL: http://svn.apache.org/viewvc?rev=961147&view=rev
Log:
- Queue: Better prefetch handling.  
- HawtDB Store: Better recovery and root record handling

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961147&r1=961146&r2=961147&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:10:59 2010
@@ -197,21 +197,14 @@ class Queue(val host: VirtualHost, val d
           queueDelivery.storeBatch.enqueue(entry.createQueueEntryRecord)
         }
 
-//        var haveQuickConsumer = false
-//        fastSubs.foreach{ sub=>
-//          if( sub.pos.seq < entry.seq ) {
-//            haveQuickConsumer = true
-//          }
-//        }
 
-        def haveQuickConsumer = fastSubs.find( sub=> !sub.slow && sub.pos.seq <= entry.seq ).isDefined
+        def haveQuickConsumer = fastSubs.find( sub=> sub.pos.seq <= entry.seq ).isDefined
 
         var dispatched = false
         if( entry.prefetched > 0 || haveQuickConsumer ) {
           // try to dispatch it directly...
           entry.dispatch
         } else {
-//          println("flush: "+delivery.message.getProperty("color"))
           // we flush the entry out right away if it looks
           // it wont be needed.
           entry.flush
@@ -228,22 +221,26 @@ class Queue(val host: VirtualHost, val d
     }
   }
 
+
+  var checkCounter = 0
   def schedualSlowConsumerCheck:Unit = {
 
     def slowConsumerCheck = {
       if( retained > 0 ) {
+        checkCounter += 1
 
         // target tune_min_subscription_rate / sec
         val slowCursorDelta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
 
         var idleConsumerCount = 0
 
-
-//        if( consumerSubs.isEmpty ) {
+// Handy for periodically looking at the dispatch state...
+//
+//        if( !consumerSubs.isEmpty && (checkCounter%100)==0 ) {
 //          println("using "+size+" out of "+capacity+" buffer space.");
 //          var cur = entries.getHead
 //          while( cur!=null ) {
-//            if( cur.asLoaded!=null ) {
+//            if( cur.asLoaded!=null || cur.hasSubs || cur.prefetched>0 ) {
 //              println("  => "+cur)
 //            }
 //            cur = cur.getNext
@@ -291,6 +288,20 @@ class Queue(val host: VirtualHost, val d
           }
         }
 
+        // flush tail entries that are still loaded but which have no fast subs that can process them.
+        var cur = entries.getTail
+        while( cur!=null ) {
+          def haveQuickConsumer = fastSubs.find( sub=> sub.pos.seq <= cur.seq ).isDefined
+          if( !cur.hasSubs && cur.prefetched==0 && cur.asFlushed==null && !haveQuickConsumer ) {
+            // then flush out to make space...
+            cur.flush
+            cur = cur.getPrevious
+          } else {
+            cur = null
+          }
+        }
+
+
         // Trigger a swap if we have slow consumers and we are full..
         if( idleConsumerCount > 0 && messages.full && flushingSize==0 ) {
           swap
@@ -339,8 +350,6 @@ class Queue(val host: VirtualHost, val d
         entry.unlink
         ack(entry.value, tx)
     }
-    
-//    println("acked... full: "+messages.full)
     messages.refiller.run
   }
   
@@ -512,10 +521,13 @@ class Queue(val host: VirtualHost, val d
 
 
 object QueueEntry extends Sizer[QueueEntry] {
+
   def size(value: QueueEntry): Int = value.size
 }
 
-class QueueEntry(val queue:Queue, val seq:Long) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
+class QueueEntry(val queue:Queue, val seq:Long) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable with DispatchLogging {
+  override protected def log = Queue
+
   import QueueEntry._
 
   var competing:List[Subscription] = Nil
@@ -567,11 +579,14 @@ class QueueEntry(val queue:Queue, val se
 
     // remove from prefetch counters..
     var cur = this;
-    while( prefetched > 0 ) {
+    while( cur!=null && prefetched > 0 ) {
       if( cur.hasSubs ) {
         (cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(cur) ) { sub.removePrefetch(cur) } }
       }
       cur = cur.getPrevious
+      if( cur == null ) {
+        error("illegal prefetch state detected.")
+      }
     }
 
     this.value = new Tombstone()
@@ -749,6 +764,13 @@ class QueueEntry(val queue:Queue, val se
           // loads to reduce cross thread synchronization
           if( delivery.isDefined ) {
             queue.store_load_source.merge((QueueEntry.this, delivery.get))
+          } else {
+            // Looks like someone else removed the message from the store.. lets just
+            // tombstone this entry now.
+            queue.dispatchQueue {
+              debug("Detected store drop of message key: %d", ref)
+              tombstone
+            }
           }
         }
       }
@@ -873,7 +895,7 @@ class QueueEntry(val queue:Queue, val se
           p.addCompeting(competingFastSubs)
 
           // flush this entry out if it's not going to be needed soon.
-          def haveQuickConsumer = queue.fastSubs.find( sub=> !sub.slow && sub.pos.seq <= seq ).isDefined
+          def haveQuickConsumer = queue.fastSubs.find( sub=> sub.pos.seq <= seq ).isDefined
           if( !hasSubs && prefetched==0 && !aquired && !haveQuickConsumer ) {
             // then flush out to make space...
             flush
@@ -947,9 +969,9 @@ class Subscription(queue:Queue) extends 
   }
 
   def removePrefetch(value:QueueEntry):Unit = {
-//    trace("prefetch rm: "+value.seq)
     value.prefetched -= 1
     prefetchSize -= value.size
+
     fillPrefetch()
   }
 
@@ -961,7 +983,6 @@ class Subscription(queue:Queue) extends 
   }
 
   def addPrefetch(value:QueueEntry):Unit = {
-//    trace("prefetch add: "+value.seq)
     prefetchSize += value.size
     value.prefetched += 1
     value.load

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961147&r1=961146&r2=961147&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:10:59 2010
@@ -44,6 +44,7 @@ import java.util.{TreeSet, HashSet}
 
 import org.fusesource.hawtdb.api._
 import org.apache.activemq.apollo.broker.{DispatchLogging, Log, Logging, BaseService}
+import org.apache.activemq.apollo.util.TimeCounter
 
 object HawtDBClient extends Log {
   val BEGIN = -1
@@ -81,7 +82,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   private var lastRecoveryPosition: Location = null
   private var recoveryCounter = 0
 
-  var databaseRootRecord = new DatabaseRootRecord.Bean
+  @volatile
+  var rootBuffer = (new DatabaseRootRecord.Bean()).freeze
 
 
   val next_batch_counter = new AtomicInteger(0)
@@ -167,17 +169,17 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             val rootPage = tx.alloc()
             assert(rootPage == 0)
 
-            databaseRootRecord.setQueueIndexPage(alloc(QUEUE_INDEX_FACTORY))
-            databaseRootRecord.setMessageKeyIndexPage(alloc(MESSAGE_KEY_INDEX_FACTORY))
-            databaseRootRecord.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
-            databaseRootRecord.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
-            databaseRootRecord.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
+            rootBean.setQueueIndexPage(alloc(QUEUE_INDEX_FACTORY))
+            rootBean.setMessageKeyIndexPage(alloc(MESSAGE_KEY_INDEX_FACTORY))
+            rootBean.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
+            rootBean.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
+            rootBean.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
+            rootBuffer = rootBean.freeze
 
-            tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze)
-            databaseRootRecord = databaseRootRecord.copy
+            tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
             true
           } else {
-            databaseRootRecord = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0).copy
+            rootBuffer = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0)
             false
           }
       }
@@ -306,23 +308,35 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
   }
 
+  val metric_load_from_index = new TimeCounter
+  val metric_load_from_journal = new TimeCounter
+
   def loadMessage(messageKey: Long): Option[MessageRecord] = {
-    withTx { tx =>
+    metric_load_from_index.start { end =>
+      withTx { tx =>
+        val idxPage = rootBuffer.getMessageKeyIndexPage
+
         val helper = new TxHelper(tx)
         import JavaConversions._
         import helper._
 
         val location = messageKeyIndex.get(messageKey)
+        end()
+
         if (location != null) {
-          load(location, classOf[AddMessage.Getter]) match {
-            case Some(x) =>
-              val messageRecord: MessageRecord = x
-              Some(messageRecord)
-            case None => None
+          metric_load_from_journal.time {
+            load(location, classOf[AddMessage.Getter]) match {
+              case Some(x) =>
+                val messageRecord: MessageRecord = x
+                Some(messageRecord)
+              case None => None
+            }
           }
         } else {
+          debug("Message not indexed.  Journal location could not be determined for message: %s", messageKey)
           None
         }
+      }
     }
   }
 
@@ -336,12 +350,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   private def load[T <: TypeCreatable](location: Location, expected: Class[T]): Option[T] = {
     try {
       load(location) match {
-        case (updateType, batch, data) =>
-          Some(expected.cast(decode(location, updateType, data)))
+          case (updateType, batch, data) =>
+            val decoded = expected.cast(decode(location, updateType, data))
+            val rc = Some(decoded)
+            rc
       }
     } catch {
-      case e: Exception =>
-        debug("Could not load journal record at: %s", location)
+      case e: Throwable =>
+        debug(e, "Could not load journal record at: %s", location)
         None
     }
   }
@@ -368,10 +384,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   private def _store(batch: Int, update: TypeCreatable, onComplete: Runnable): Unit = {
     val kind = update.asInstanceOf[TypeCreatable]
     val frozen = update.freeze
-    val baos = new DataByteArrayOutputStream(frozen.serializedSizeUnframed + 1)
+    val baos = new DataByteArrayOutputStream(frozen.serializedSizeFramed + 5)
     baos.writeByte(kind.toType().getNumber())
     baos.writeInt(batch)
-    frozen.writeUnframed(baos)
+    frozen.writeFramed(baos)
 
     val buffer = baos.toBuffer()
     append(buffer) {
@@ -464,14 +480,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
     // Is this our first incremental recovery pass?
     if (lastRecoveryPosition == null) {
-      if (databaseRootRecord.hasFirstBatchLocation) {
+      if (rootBuffer.hasFirstBatchLocation) {
         // we have to start at the first in progress batch usually...
-        nextRecoveryPosition = databaseRootRecord.getFirstBatchLocation
+        nextRecoveryPosition = rootBuffer.getFirstBatchLocation
       } else {
         // but perhaps there were no batches in progress..
-        if (databaseRootRecord.hasLastUpdateLocation) {
+        if (rootBuffer.hasLastUpdateLocation) {
           // then we can just continue from the last update applied to the index
-          lastRecoveryPosition = databaseRootRecord.getLastUpdateLocation
+          lastRecoveryPosition = rootBuffer.getLastUpdateLocation
           nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
         } else {
           // no updates in the index?.. start from the first record in the journal.
@@ -533,16 +549,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     })
   }
 
-  /**
-   */
-  def benchmarkLatency[R](func: (String=>Unit)=>R ):R = {
-    val start = System.nanoTime
-    func { label=>
-      var end = System.nanoTime
-      warn("latencey: %s is %,.3f ms", label, ( (end - start).toFloat / TimeUnit.MILLISECONDS.toNanos(1)))
-    }
-  }
-
   def read(location: Location) = journal.read(location)
 
   /////////////////////////////////////////////////////////////////////
@@ -565,12 +571,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt
         // after the last update location.
         if (!recovering || isAfterLastUpdateLocation(location)) {
           withTx { tx =>
-            // index the updates
+              val helper = new TxHelper(tx)
+              // index the updates
               updates.foreach {
                 update =>
-                  index(tx, update.update, update.location)
+                  index(helper, update.update, update.location)
               }
-              updateLocations(tx, location)
+              helper.updateLocations(location)
           }
         }
       case None =>
@@ -589,7 +596,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       case Some((_, _)) =>
         if (!recovering || isAfterLastUpdateLocation(location)) {
           withTx { tx =>
-              updateLocations(tx, location)
+            val helper = new TxHelper(tx)
+            helper.updateLocations(location)
           }
         }
       case None =>
@@ -610,9 +618,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       // after the last update location.
       if (!recovering || isAfterLastUpdateLocation(location)) {
         withTx { tx =>
-          // index the update now.
-            index(tx, update, location)
-            updateLocations(tx, location)
+            val helper = new TxHelper(tx)
+            index(helper, update, location)
+            helper.updateLocations(location)
         }
       }
 
@@ -637,9 +645,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   }
 
 
-  private def index(tx: Transaction, update: TypeCreatable, location: Location): Unit = {
-
-    val helper = new TxHelper(tx)
+  private def index(helper:TxHelper, update: TypeCreatable, location: Location): Unit = {
     import JavaConversions._
     import helper._
 
@@ -649,7 +655,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
         val fileId:jl.Integer = location.getDataFileId()
         addAndGet(dataFileRefIndex, fileId, -1)
       } else {
-        error("Cannot remove message, it did not exist: %d", key)
+        if( !recovering ) {
+          error("Cannot remove message, it did not exist: %d", key)
+        }
       }
     }
 
@@ -677,8 +685,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       case x: AddMessage.Getter =>
 
         val messageKey = x.getMessageKey()
-        if (messageKey > databaseRootRecord.getLastMessageKey) {
-          databaseRootRecord.setLastMessageKey(messageKey)
+        if (messageKey > rootBean.getLastMessageKey) {
+          rootBean.setLastMessageKey(messageKey)
         }
 
         val prevLocation = messageKeyIndex.put(messageKey, location)
@@ -743,26 +751,33 @@ class HawtDBClient(hawtDBStore: HawtDBSt
           if (queueEntry != null) {
             val messageKey = queueEntry.getMessageKey
             val existing = trackingIndex.remove(messageKey)
-            if (existing == null) {
-              error("Tracking entry not found for message %d", queueEntry.getMessageKey)
-            }
-            if( addAndGet(messageRefsIndex, new jl.Long(messageKey), -1) == 0 ) {
-              // message is no longer referenced.. we can remove it..
-              removeMessage(messageKey)
+            if (existing != null) {
+              if( addAndGet(messageRefsIndex, new jl.Long(messageKey), -1) == 0 ) {
+                // message is no longer referenced.. we can remove it..
+                removeMessage(messageKey)
+              }
+            } else {
+              if( !recovering ) {
+                error("Tracking entry not found for message %d", queueEntry.getMessageKey)
+              }
             }
           } else {
-            error("Queue entry not found for seq %d", x.getQueueSeq)
+            if( !recovering ) {
+              error("Queue entry not found for seq %d", x.getQueueSeq)
+            }
           }
         } else {
-          error("Queue not found: %d", x.getQueueKey)
+          if( !recovering ) {
+            error("Queue not found: %d", x.getQueueKey)
+          }
         }
 
       case x: AddQueue.Getter =>
         val queueKey = x.getKey
         if (queueIndex.get(queueKey) == null) {
 
-          if (queueKey > databaseRootRecord.getLastQueueKey) {
-            databaseRootRecord.setLastQueueKey(queueKey)
+          if (queueKey > rootBean.getLastQueueKey) {
+            rootBean.setLastQueueKey(queueKey)
           }
 
           val queueRecord = new QueueRootRecord.Bean
@@ -790,9 +805,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
         messageKeyIndex.clear
         messageRefsIndex.clear
         dataFileRefIndex.clear
-        databaseRootRecord.setLastMessageKey(0)
+        rootBean.setLastMessageKey(0)
 
-        cleanup(tx);
+        cleanup(_tx);
         info("Store purged.");
 
       case x: AddSubscription.Getter =>
@@ -870,11 +885,11 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     }
 
     // Don't GC files that we will need for recovery..
-    val upto = if (databaseRootRecord.hasFirstBatchLocation) {
-      Some(databaseRootRecord.getFirstBatchLocation.getDataFileId)
+    val upto = if (rootBuffer.hasFirstBatchLocation) {
+      Some(rootBuffer.getFirstBatchLocation.getDataFileId)
     } else {
-      if (databaseRootRecord.hasLastUpdateLocation) {
-        Some(databaseRootRecord.getLastUpdateLocation.getDataFileId)
+      if (rootBuffer.hasLastUpdateLocation) {
+        Some(rootBuffer.getLastUpdateLocation.getDataFileId)
       } else {
         None
       }
@@ -915,12 +930,12 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
   private case class Update(update: TypeCreatable, location: Location)
 
-  private class TxHelper(private val _tx: Transaction) {
-    lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, databaseRootRecord.getQueueIndexPage)
-    lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage)
-    lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageKeyIndexPage)
-    lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageRefsIndexPage)
-    lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, databaseRootRecord.getSubscriptionIndexPage)
+  private class TxHelper(val _tx: Transaction) {
+    lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, rootBuffer.getQueueIndexPage)
+    lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, rootBuffer.getDataFileRefIndexPage)
+    lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, rootBuffer.getMessageKeyIndexPage)
+    lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, rootBuffer.getMessageRefsIndexPage)
+    lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, rootBuffer.getSubscriptionIndexPage)
 
     def addAndGet[K](index:SortedIndex[K, jl.Integer], key:K, amount:Int):Int = {
       var counter = index.get(key)
@@ -949,6 +964,24 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       factory.create(_tx, rc)
       rc
     }
+
+    val rootBean = rootBuffer.copy
+
+    def lastUpdateLocation(location:Location) = {
+      rootBean.setLastUpdateLocation(location)
+    }
+
+    def updateLocations(lastUpdate: Location): Unit = {
+      rootBean.setLastUpdateLocation(lastUpdate)
+      if (batches.isEmpty) {
+        rootBean.clearFirstBatchLocation
+      } else {
+        rootBean.setFirstBatchLocation(batches.head._2._1)
+      }
+      rootBuffer = rootBean.freeze
+      _tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, rootBuffer)
+    }
+
   }
 
   private def withTx[T](func: (Transaction) => T): T = {
@@ -981,18 +1014,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   }
 
   private def isAfterLastUpdateLocation(location: Location) = {
-    val lastUpdate: Location = databaseRootRecord.getLastUpdateLocation
+    val lastUpdate: Location = rootBuffer.getLastUpdateLocation
     lastUpdate.compareTo(location) < 0
   }
 
-  private def updateLocations(tx: Transaction, lastUpdate: Location): Unit = {
-    databaseRootRecord.setLastUpdateLocation(lastUpdate)
-    if (batches.isEmpty) {
-      databaseRootRecord.clearFirstBatchLocation
-    } else {
-      databaseRootRecord.setFirstBatchLocation(batches.head._2._1)
-    }
-    tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze)
-    databaseRootRecord = databaseRootRecord.copy
-  }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961147&r1=961146&r2=961147&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:10:59 2010
@@ -91,7 +91,7 @@ class HawtDBStore extends Store with Bas
   }
 
   protected def _start(onCompleted: Runnable) = {
-    executor_pool = new ThreadPoolExecutor(4, 20, 1, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable](), new ThreadFactory(){
+    executor_pool = Executors.newFixedThreadPool(20, new ThreadFactory(){
       def newThread(r: Runnable) = {
         val rc = new Thread(r, "hawtdb store client")
         rc.setDaemon(true)
@@ -102,8 +102,8 @@ class HawtDBStore extends Store with Bas
     schedualDisplayStats
     executor_pool {
       client.start(^{
-        next_msg_key.set( client.databaseRootRecord.getLastMessageKey.longValue +1 )
-        next_queue_key.set( client.databaseRootRecord.getLastQueueKey.longValue +1 )
+        next_msg_key.set( client.rootBuffer.getLastMessageKey.longValue +1 )
+        next_queue_key.set( client.rootBuffer.getLastQueueKey.longValue +1 )
         onCompleted.run
       })
     }
@@ -162,11 +162,11 @@ class HawtDBStore extends Store with Bas
 
   def loadMessage(id: Long)(callback: (Option[MessageRecord]) => Unit) = {
     executor_pool ^{
-      callback( client.loadMessage(id) )
+      val rc = client.loadMessage(id)
+      callback( rc )
     }
   }
 
-
   def listQueueEntries(id: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
     executor_pool ^{
       callback( client.getQueueEntries(id) )
@@ -319,9 +319,30 @@ class HawtDBStore extends Store with Bas
 
         def rate(x:Long, y:Long):Float = ((y-x)*1000.0f)/TimeUnit.NANOSECONDS.toMillis(et-st)
 
+        val m1 = rate(ss._1,es._1)
+        val m2 = rate(ss._2,es._2)
+        val m3 = rate(ss._3,es._3)
+        val m4 = rate(ss._4,es._4)
+
+        if( m1>0f || m2>0f || m3>0f || m4>0f ) {
+          info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f, store latency: %,.3f",
+            m1, m2, m3, m3, avgCommitLatency, storeLatency(true).avgTime(TimeUnit.MILLISECONDS) )
+        }
+
+
+        def display(name:String, counter:TimeCounter) {
+          var t = counter.apply(true)
+          if( t.count > 0 ) {
+            info("%s latency in ms: avg: %,.3f, max: %,.3f, min: %,.3f", name, t.avgTime(TimeUnit.MILLISECONDS), t.maxTime(TimeUnit.MILLISECONDS), t.minTime(TimeUnit.MILLISECONDS))
+          }
+        }
+
+//        display("total msg load", loadMessageTimer)
+//        display("index read", client.indexLoad)
+//        display("toal journal load", client.journalLoad)
+//        display("journal read", client.journalRead)
+//        display("journal decode", client.journalDecode)
 
-        info("metrics: cancled: { messages: %,.3f, enqeues: %,.3f }, flushed: { messages: %,.3f, enqeues: %,.3f }, commit latency: %,.3f, store latency: %,.3f",
-          rate(ss._1,es._1), rate(ss._2,es._2), rate(ss._3,es._3), rate(ss._4,es._4), avgCommitLatency, storeLatency(true).avgTime(TimeUnit.MILLISECONDS) )
         schedualDisplayStats
       }
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961147&r1=961146&r2=961147&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala Wed Jul  7 04:10:59 2010
@@ -178,7 +178,7 @@ object Helpers {
     if (t == null) {
       throw new IOException("Could not load journal record. Invalid type at location: " + location);
     }
-    t.parseUnframed(value).asInstanceOf[TypeCreatable]
+    t.parseFramed(value).asInstanceOf[TypeCreatable]
   }
 
   //

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala?rev=961147&r1=961146&r2=961147&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala Wed Jul  7 04:10:59 2010
@@ -144,20 +144,28 @@ abstract class StoreBenchmarkSupport ext
     msgKeys
   }
 
-  test("store enqueue latencey") {
+  test("store enqueue and load latencey") {
     val A = addQueue("A")
-    var seq = 0
+    var messageKeys = storeMessages(A)
+    loadMessages(A, messageKeys)
+  }
+
+  def storeMessages(queue:Long) = {
+
+    var seq = 0L
+    var messageKeys = ListBuffer[Long]()
 
     val content = payload("message\n", 1024)
-    val metric = benchmark {
+    var metric = benchmarkCount(100000) {
       seq += 1
 
       var batch = store.createStoreBatch
       val message = addMessage(batch, content)
-      batch.enqueue(entry(A, seq, message))
+      messageKeys += message
+      batch.enqueue(entry(queue, seq, message))
 
       val latch = new CountDownLatch(1)
-      batch.setDisposer(^{cd(latch)} )
+      batch.setDisposer(^{latch.countDown} )
       batch.release
       store.flushMessage(message) {}
 
@@ -167,12 +175,27 @@ abstract class StoreBenchmarkSupport ext
     println("enqueue metrics: "+metric)
     println("enqueue latency is: "+metric.latency(TimeUnit.MILLISECONDS)+" ms")
     println("enqueue rate is: "+metric.rate(TimeUnit.SECONDS)+" enqueues/s")
+    messageKeys.toList
   }
 
-  def cd(latch:CountDownLatch) = {
-    latch.countDown
-  }
+  def loadMessages(queue:Long, messageKeys: List[Long]) = {
+    
+    var keys = messageKeys.toList
+    val metric = benchmarkCount(keys.size) {
+      val latch = new CountDownLatch(1)
+      store.loadMessage(keys.head) { msg=>
+        assert(msg.isDefined, "message key not found: "+keys.head)
+        latch.countDown
+      }
+      latch.await
+      keys = keys.drop(1)
+    }
+
+    println("load metrics: "+metric)
+    println("load latency is: "+metric.latency(TimeUnit.MILLISECONDS)+" ms")
+    println("load rate is: "+metric.rate(TimeUnit.SECONDS)+" loads/s")
 
+  }
 
   case class Metric(count:Long, duration:Long) {
     def latency(unit:TimeUnit) = {
@@ -183,14 +206,20 @@ abstract class StoreBenchmarkSupport ext
     }
   }
 
-  def benchmark(func: =>Unit ) = {
+  def benchmarkFor(duration:Int)(func: =>Unit ) = {
 
     val counter = new AtomicLong()
     val done = new AtomicBoolean()
+    val warmup = new AtomicBoolean(true)
+
     var startT = 0L
     var endT = 0L
     val thread = new Thread("benchmarked task") {
+
       override def run = {
+        while(warmup.get) {
+          func
+        }
         startT = System.nanoTime();
         while(!done.get) {
           func
@@ -201,12 +230,24 @@ abstract class StoreBenchmarkSupport ext
     }
 
     thread.start()
-    Thread.sleep(1000*30)
+
+    Thread.sleep(1000*5)
+    warmup.set(false)
+    Thread.sleep(1000*duration)
     done.set(true)
     thread.join
 
     Metric(counter.get, endT-startT)
   }
 
-
+  def benchmarkCount(iterations:Int)(func: =>Unit ) = {
+    val startT = System.nanoTime();
+    var i = 0
+    while( i < iterations) {
+      func
+      i += 1
+    }
+    val endT = System.nanoTime();
+    Metric(iterations, endT-startT)
+  }
 }