You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/06 21:29:15 UTC

svn commit: r1042779 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/ apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broke...

Author: chirino
Date: Mon Dec  6 20:29:14 2010
New Revision: 1042779

URL: http://svn.apache.org/viewvc?rev=1042779&view=rev
Log:
- Fixed unbounded draining of UOW sent to the stores... could cause GC overhead exceptions
- BDB store now uses multiple read threads to increase read throughput. 
- Tuning prefetch handling.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/BDBStoreDTO.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/BDBStore.scala?rev=1042779&r1=1042778&r2=1042779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/BDBStore.scala Mon Dec  6 20:29:14 2010
@@ -68,7 +68,8 @@ class BDBStore extends DelayingStoreSupp
   var next_queue_key = new AtomicLong(1)
   var next_msg_key = new AtomicLong(1)
 
-  var executor_pool:ExecutorService = _
+  var write_executor:ExecutorService = _
+  var read_executor:ExecutorService = _
   var config:BDBStoreDTO = defaultConfig
   val client = new BDBClient(this)
 
@@ -79,7 +80,7 @@ class BDBStore extends DelayingStoreSupp
   protected def get_next_msg_key = next_msg_key.getAndIncrement
 
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
-    executor_pool {
+    write_executor {
       client.store(uows, ^{
         dispatchQueue {
           callback
@@ -103,16 +104,23 @@ class BDBStore extends DelayingStoreSupp
 
   protected def _start(onCompleted: Runnable) = {
     info("Starting bdb store at: '%s'", config.directory)
-    executor_pool = Executors.newFixedThreadPool(1, new ThreadFactory(){
+    write_executor = Executors.newFixedThreadPool(1, new ThreadFactory(){
       def newThread(r: Runnable) = {
-        val rc = new Thread(r, toString+" io")
+        val rc = new Thread(r, "bdb store io write")
+        rc.setDaemon(true)
+        rc
+      }
+    })
+    read_executor = Executors.newFixedThreadPool(config.read_threads.getOrElse(10), new ThreadFactory(){
+      def newThread(r: Runnable) = {
+        val rc = new Thread(r, "bdb store io read")
         rc.setDaemon(true)
         rc
       }
     })
     client.config = config
     poll_stats
-    executor_pool {
+    write_executor {
       client.start()
       next_msg_key.set( client.getLastMessageKey +1 )
       next_queue_key.set( client.getLastQueueKey +1 )
@@ -124,9 +132,12 @@ class BDBStore extends DelayingStoreSupp
     new Thread() {
       override def run = {
         info("Stopping BDB store at: '%s'", config.directory)
-        executor_pool.shutdown
-        executor_pool.awaitTermination(86400, TimeUnit.SECONDS)
-        executor_pool = null
+        write_executor.shutdown
+        write_executor.awaitTermination(86400, TimeUnit.SECONDS)
+        write_executor = null
+        read_executor.shutdown
+        read_executor.awaitTermination(86400, TimeUnit.SECONDS)
+        read_executor = null
         client.stop
         onCompleted.run
       }
@@ -143,7 +154,7 @@ class BDBStore extends DelayingStoreSupp
    * Deletes all stored data from the store.
    */
   def purge(callback: =>Unit) = {
-    executor_pool {
+    write_executor {
       client.purge()
       next_queue_key.set(1)
       next_msg_key.set(1)
@@ -156,31 +167,31 @@ class BDBStore extends DelayingStoreSupp
    * Ges the last queue key identifier stored.
    */
   def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
-    executor_pool {
+    write_executor {
       callback(Some(client.getLastQueueKey))
     }
   }
 
   def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
-    executor_pool {
+    write_executor {
      client.addQueue(record, ^{ callback(true) })
     }
   }
 
   def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
-    executor_pool {
+    write_executor {
       client.removeQueue(queueKey,^{ callback(true) })
     }
   }
 
   def getQueue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
-    executor_pool {
+    write_executor {
       callback( client.getQueue(queueKey) )
     }
   }
 
   def listQueues(callback: (Seq[Long]) => Unit) = {
-    executor_pool {
+    write_executor {
       callback( client.listQueues )
     }
   }
@@ -202,19 +213,19 @@ class BDBStore extends DelayingStoreSupp
   def drain_loads = {
     var data = load_source.getData
     message_load_batch_size_counter += data.size
-    executor_pool ^{
+    read_executor ^{
       client.loadMessages(data)
     }
   }
 
   def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
-    executor_pool ^{
+    write_executor ^{
       callback( client.listQueueEntryGroups(queueKey, limit) )
     }
   }
 
   def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
-    executor_pool ^{
+    write_executor ^{
       callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/BDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/BDBStoreDTO.java?rev=1042779&r1=1042778&r2=1042779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/BDBStoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/BDBStoreDTO.java Mon Dec  6 20:29:14 2010
@@ -34,8 +34,7 @@ public class BDBStoreDTO extends StoreDT
     @XmlAttribute
     public File directory;
 
-//    @XmlAttribute(name="archive-directory")
-//    public File archive_directory;
-//
+    @XmlAttribute(name = "read-threads")
+    public Integer read_threads;
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1042779&r1=1042778&r2=1042779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Dec  6 20:29:14 2010
@@ -154,7 +154,6 @@ class Queue(val host: VirtualHost, var i
     def completed: Unit = {
       // by the time this is run, consumers and producers may have already joined.
       onCompleted.run
-      display_stats
       schedual_consumer_sample
       // wake up the producers to fill us up...
       if (messages.refiller != null) {
@@ -261,7 +260,14 @@ class Queue(val host: VirtualHost, var i
           // try to dispatch it directly...
           entry.dispatch
         }
-        trigger_swap
+
+        val prev = entry.getPrevious
+
+        if( (prev.as_loaded!=null && prev.as_loaded.flushing) || (prev.as_flushed!=null && !prev.as_flushed.loading) ) {
+          entry.flush(!entry.as_loaded.acquired)
+        } else {
+          trigger_swap
+        }
 
         // release the store batch...
         if (queueDelivery.uow != null) {
@@ -331,10 +337,14 @@ class Queue(val host: VirtualHost, var i
     // swap out messages.
     cur = entries.getHead
     while( cur!=null ) {
-      if( cur.is_loaded && cur.prefetch_flags==0 && !cur.as_loaded.acquired ) {
-        val flush_asap = !cur.as_loaded.acquired
-//        display_active_entries
-        cur.flush(flush_asap)
+      val loaded = cur.as_loaded
+      if( loaded!=null ) {
+        if( cur.prefetch_flags==0 && !loaded.acquired  ) {
+          val flush_asap = !cur.as_loaded.acquired
+          cur.flush(flush_asap)
+        } else {
+          cur.load // just in case it's getting flushed.
+        }
       }
       cur = cur.getNext
     }
@@ -382,19 +392,26 @@ class Queue(val host: VirtualHost, var i
 
         // target tune_min_subscription_rate / sec
         all_subscriptions.foreach{ case (consumer, sub)=>
-          sub.advanced_sizes += {
-            if( sub.tail_parkings > 0  ) {
-              sub.advanced_size.max(1024*1024*20)
-            } else {
-              sub.advanced_size
-            }
+          
+          val advanced = if ( sub.tail_parkings > 0 ) {
+            // guesstimate what full speed would have been.
+            sub.advanced_size.max(sub.best_advanced_size)
+          } else {
+            sub.advanced_size
           }
-          sub.tail_parkings = 0
+          
+          // keep track of the last few advance sizes..
+          sub.advanced_sizes += advanced
           while( sub.advanced_sizes.size > 10 ) {
             sub.advanced_sizes = sub.advanced_sizes.drop(1)
           }
+
+          sub.best_advanced_size = sub.advanced_sizes.foldLeft(0)(_ max _)
+          
           sub.total_advanced_size += sub.advanced_size
           sub.advanced_size = 0
+          sub.tail_parkings = 0
+          
 
         }
 
@@ -1189,9 +1206,7 @@ class QueueEntry(val queue:Queue, val se
             val size_delta: Int = size - size_count
 
             if ( item_delta!=0 || size_delta!=0 ) {
-              assert(item_delta <= 0)
-              assert(size_delta <= 0)
-              info("Detected store dropped %d message(s) in seq range %d to %d using %d bytes", item_delta, seq, last, size_delta)
+              info("Detected store change in range %d to %d. %d message(s) and %d bytes", seq, last, item_delta, size_delta)
               queue.enqueue_item_counter += item_delta
               queue.enqueue_size_counter += size_delta
             }
@@ -1258,16 +1273,11 @@ class Subscription(queue:Queue) extends 
 
   var total_advanced_size = 0L
   var advanced_size = 0
-  var advanced_sizes = ListBuffer[Int](1024*1024*20) // use circular buffer instead.
+  var advanced_sizes = ListBuffer[Int]() // use circular buffer instead.
 
+  var best_advanced_size = queue.tune_consumer_buffer * 100
   var tail_parkings = 1
 
-  var best_advanced_size = if(advanced_sizes.isEmpty) {
-    0
-  } else {
-    advanced_sizes.foldLeft(0)(_ max _)
-  }
-
   override def toString = {
     def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
     "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+"}"
@@ -1367,12 +1377,7 @@ class Subscription(queue:Queue) extends 
       next = next.getNext
     }
 
-    remaining = if(tail_parkings > 0) {
-      queue.tune_consumer_buffer * 100
-    } else {
-      best_advanced_size * 10
-    }
-
+    remaining = best_advanced_size 
     while( remaining>0 && next!=null ) {
       remaining -= next.size
       next.prefetch_flags |= 2

Modified: activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala?rev=1042779&r1=1042778&r2=1042779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala Mon Dec  6 20:29:14 2010
@@ -293,7 +293,9 @@ trait DelayingStoreSupport extends Store
 
     if( !uows.isEmpty ) {
       flush_latency_counter.start { end=>
+        flush_source.suspend
         store(uows) {
+          flush_source.resume
           end()
           uows.foreach { uow=>