You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/06 21:29:15 UTC
svn commit: r1042779 - in /activemq/activemq-apollo/trunk:
apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/
apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broke...
Author: chirino
Date: Mon Dec 6 20:29:14 2010
New Revision: 1042779
URL: http://svn.apache.org/viewvc?rev=1042779&view=rev
Log:
- Fixed unbounded draining of UOW sent to the stores... could cause GC overhead exceptions
- BDB store now uses multiple read threads to increase read throughput.
- Tuning prefetch handling.
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/BDBStore.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/BDBStoreDTO.java
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/BDBStore.scala?rev=1042779&r1=1042778&r2=1042779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/BDBStore.scala Mon Dec 6 20:29:14 2010
@@ -68,7 +68,8 @@ class BDBStore extends DelayingStoreSupp
var next_queue_key = new AtomicLong(1)
var next_msg_key = new AtomicLong(1)
- var executor_pool:ExecutorService = _
+ var write_executor:ExecutorService = _
+ var read_executor:ExecutorService = _
var config:BDBStoreDTO = defaultConfig
val client = new BDBClient(this)
@@ -79,7 +80,7 @@ class BDBStore extends DelayingStoreSupp
protected def get_next_msg_key = next_msg_key.getAndIncrement
protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
- executor_pool {
+ write_executor {
client.store(uows, ^{
dispatchQueue {
callback
@@ -103,16 +104,23 @@ class BDBStore extends DelayingStoreSupp
protected def _start(onCompleted: Runnable) = {
info("Starting bdb store at: '%s'", config.directory)
- executor_pool = Executors.newFixedThreadPool(1, new ThreadFactory(){
+ write_executor = Executors.newFixedThreadPool(1, new ThreadFactory(){
def newThread(r: Runnable) = {
- val rc = new Thread(r, toString+" io")
+ val rc = new Thread(r, "bdb store io write")
+ rc.setDaemon(true)
+ rc
+ }
+ })
+ read_executor = Executors.newFixedThreadPool(config.read_threads.getOrElse(10), new ThreadFactory(){
+ def newThread(r: Runnable) = {
+ val rc = new Thread(r, "bdb store io read")
rc.setDaemon(true)
rc
}
})
client.config = config
poll_stats
- executor_pool {
+ write_executor {
client.start()
next_msg_key.set( client.getLastMessageKey +1 )
next_queue_key.set( client.getLastQueueKey +1 )
@@ -124,9 +132,12 @@ class BDBStore extends DelayingStoreSupp
new Thread() {
override def run = {
info("Stopping BDB store at: '%s'", config.directory)
- executor_pool.shutdown
- executor_pool.awaitTermination(86400, TimeUnit.SECONDS)
- executor_pool = null
+ write_executor.shutdown
+ write_executor.awaitTermination(86400, TimeUnit.SECONDS)
+ write_executor = null
+ read_executor.shutdown
+ read_executor.awaitTermination(86400, TimeUnit.SECONDS)
+ read_executor = null
client.stop
onCompleted.run
}
@@ -143,7 +154,7 @@ class BDBStore extends DelayingStoreSupp
* Deletes all stored data from the store.
*/
def purge(callback: =>Unit) = {
- executor_pool {
+ write_executor {
client.purge()
next_queue_key.set(1)
next_msg_key.set(1)
@@ -156,31 +167,31 @@ class BDBStore extends DelayingStoreSupp
* Ges the last queue key identifier stored.
*/
def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
- executor_pool {
+ write_executor {
callback(Some(client.getLastQueueKey))
}
}
def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
- executor_pool {
+ write_executor {
client.addQueue(record, ^{ callback(true) })
}
}
def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
- executor_pool {
+ write_executor {
client.removeQueue(queueKey,^{ callback(true) })
}
}
def getQueue(queueKey: Long)(callback: (Option[QueueRecord]) => Unit) = {
- executor_pool {
+ write_executor {
callback( client.getQueue(queueKey) )
}
}
def listQueues(callback: (Seq[Long]) => Unit) = {
- executor_pool {
+ write_executor {
callback( client.listQueues )
}
}
@@ -202,19 +213,19 @@ class BDBStore extends DelayingStoreSupp
def drain_loads = {
var data = load_source.getData
message_load_batch_size_counter += data.size
- executor_pool ^{
+ read_executor ^{
client.loadMessages(data)
}
}
def listQueueEntryRanges(queueKey: Long, limit: Int)(callback: (Seq[QueueEntryRange]) => Unit) = {
- executor_pool ^{
+ write_executor ^{
callback( client.listQueueEntryGroups(queueKey, limit) )
}
}
def listQueueEntries(queueKey: Long, firstSeq: Long, lastSeq: Long)(callback: (Seq[QueueEntryRecord]) => Unit) = {
- executor_pool ^{
+ write_executor ^{
callback( client.getQueueEntries(queueKey, firstSeq, lastSeq) )
}
}
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/BDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/BDBStoreDTO.java?rev=1042779&r1=1042778&r2=1042779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/BDBStoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/store/bdb/dto/BDBStoreDTO.java Mon Dec 6 20:29:14 2010
@@ -34,8 +34,7 @@ public class BDBStoreDTO extends StoreDT
@XmlAttribute
public File directory;
-// @XmlAttribute(name="archive-directory")
-// public File archive_directory;
-//
+ @XmlAttribute(name = "read-threads")
+ public Integer read_threads;
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1042779&r1=1042778&r2=1042779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Dec 6 20:29:14 2010
@@ -154,7 +154,6 @@ class Queue(val host: VirtualHost, var i
def completed: Unit = {
// by the time this is run, consumers and producers may have already joined.
onCompleted.run
- display_stats
schedual_consumer_sample
// wake up the producers to fill us up...
if (messages.refiller != null) {
@@ -261,7 +260,14 @@ class Queue(val host: VirtualHost, var i
// try to dispatch it directly...
entry.dispatch
}
- trigger_swap
+
+ val prev = entry.getPrevious
+
+ if( (prev.as_loaded!=null && prev.as_loaded.flushing) || (prev.as_flushed!=null && !prev.as_flushed.loading) ) {
+ entry.flush(!entry.as_loaded.acquired)
+ } else {
+ trigger_swap
+ }
// release the store batch...
if (queueDelivery.uow != null) {
@@ -331,10 +337,14 @@ class Queue(val host: VirtualHost, var i
// swap out messages.
cur = entries.getHead
while( cur!=null ) {
- if( cur.is_loaded && cur.prefetch_flags==0 && !cur.as_loaded.acquired ) {
- val flush_asap = !cur.as_loaded.acquired
-// display_active_entries
- cur.flush(flush_asap)
+ val loaded = cur.as_loaded
+ if( loaded!=null ) {
+ if( cur.prefetch_flags==0 && !loaded.acquired ) {
+ val flush_asap = !cur.as_loaded.acquired
+ cur.flush(flush_asap)
+ } else {
+ cur.load // just in case it's getting flushed.
+ }
}
cur = cur.getNext
}
@@ -382,19 +392,26 @@ class Queue(val host: VirtualHost, var i
// target tune_min_subscription_rate / sec
all_subscriptions.foreach{ case (consumer, sub)=>
- sub.advanced_sizes += {
- if( sub.tail_parkings > 0 ) {
- sub.advanced_size.max(1024*1024*20)
- } else {
- sub.advanced_size
- }
+
+ val advanced = if ( sub.tail_parkings > 0 ) {
+ // guesstimate what full speed would have been.
+ sub.advanced_size.max(sub.best_advanced_size)
+ } else {
+ sub.advanced_size
}
- sub.tail_parkings = 0
+
+ // keep track of the last few advance sizes..
+ sub.advanced_sizes += advanced
while( sub.advanced_sizes.size > 10 ) {
sub.advanced_sizes = sub.advanced_sizes.drop(1)
}
+
+ sub.best_advanced_size = sub.advanced_sizes.foldLeft(0)(_ max _)
+
sub.total_advanced_size += sub.advanced_size
sub.advanced_size = 0
+ sub.tail_parkings = 0
+
}
@@ -1189,9 +1206,7 @@ class QueueEntry(val queue:Queue, val se
val size_delta: Int = size - size_count
if ( item_delta!=0 || size_delta!=0 ) {
- assert(item_delta <= 0)
- assert(size_delta <= 0)
- info("Detected store dropped %d message(s) in seq range %d to %d using %d bytes", item_delta, seq, last, size_delta)
+ info("Detected store change in range %d to %d. %d message(s) and %d bytes", seq, last, item_delta, size_delta)
queue.enqueue_item_counter += item_delta
queue.enqueue_size_counter += size_delta
}
@@ -1258,16 +1273,11 @@ class Subscription(queue:Queue) extends
var total_advanced_size = 0L
var advanced_size = 0
- var advanced_sizes = ListBuffer[Int](1024*1024*20) // use circular buffer instead.
+ var advanced_sizes = ListBuffer[Int]() // use circular buffer instead.
+ var best_advanced_size = queue.tune_consumer_buffer * 100
var tail_parkings = 1
- var best_advanced_size = if(advanced_sizes.isEmpty) {
- 0
- } else {
- advanced_sizes.foldLeft(0)(_ max _)
- }
-
override def toString = {
def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
"{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+"}"
@@ -1367,12 +1377,7 @@ class Subscription(queue:Queue) extends
next = next.getNext
}
- remaining = if(tail_parkings > 0) {
- queue.tune_consumer_buffer * 100
- } else {
- best_advanced_size * 10
- }
-
+ remaining = best_advanced_size
while( remaining>0 && next!=null ) {
remaining -= next.size
next.prefetch_flags |= 2
Modified: activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala?rev=1042779&r1=1042778&r2=1042779&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-store/src/main/scala/org/apache/activemq/apollo/store/DelayingStoreSupport.scala Mon Dec 6 20:29:14 2010
@@ -293,7 +293,9 @@ trait DelayingStoreSupport extends Store
if( !uows.isEmpty ) {
flush_latency_counter.start { end=>
+ flush_source.suspend
store(uows) {
+ flush_source.resume
end()
uows.foreach { uow=>