You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/02 13:26:04 UTC
svn commit: r1379944 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/
Author: chirino
Date: Sun Sep 2 11:26:04 2012
New Revision: 1379944
URL: http://svn.apache.org/viewvc?rev=1379944&view=rev
Log:
The queue stats on topic queue could become inconsistent. Also avoid a small edge case where a dequeue on a swapped entry would get dropped if it became part of a swap range.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1379944&r1=1379943&r2=1379944&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sun Sep 2 11:26:04 2012
@@ -30,6 +30,7 @@ import security.{SecuredResource, Securi
import org.apache.activemq.apollo.dto._
import java.util.regex.Pattern
import collection.mutable.ListBuffer
+import java.util
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
@@ -119,6 +120,12 @@ class Queue(val router: LocalRouter, val
var tune_swap = true
/**
+ * Todo.. see if we can remove this collection. Don't think it's
+ * actually need.
+ */
+ val in_flight_removes = new util.HashSet[Long]()
+
+ /**
* The number max number of swapped queue entries to load
* for the store at a time. Note that swapped entries are just
* reference pointers to the actual messages. When not loaded,
@@ -595,19 +602,13 @@ class Queue(val router: LocalRouter, val
case state: entry.Loaded =>
var next = entry.getNext
if (!entry.is_acquired) {
- dequeue_item_counter += 1
- dequeue_size_counter += entry.size
- dequeue_ts = now
- entry.remove
+ entry.dequeue(null)
}
next
case state: entry.Swapped =>
var next = entry.getNext
if (!entry.is_acquired) {
- dequeue_item_counter += 1
- dequeue_size_counter += entry.size
- dequeue_ts = now
- entry.remove
+ entry.dequeue(null)
}
next
case state: entry.SwappedRange =>
@@ -665,12 +666,13 @@ class Queue(val router: LocalRouter, val
tail_entry = new QueueEntry(Queue.this, next_message_seq)
val queue_delivery = delivery.copy
queue_delivery.seq = entry.seq
- entry.init(queue_delivery)
-
+
if( tune_persistent ) {
queue_delivery.uow = delivery.uow
}
+ entry.init(queue_delivery)
+
entries.addLast(entry)
enqueue_item_counter += 1
enqueue_size_counter += entry.size
@@ -679,15 +681,6 @@ class Queue(val router: LocalRouter, val
// To decrease the enqueue throttle.
enqueue_remaining_take(entry.size)
- // Do we need to do a persistent enqueue???
- val persisted = queue_delivery.uow != null
- if (persisted) {
- entry.state match {
- case state:entry.Loaded => state.store
- case state:entry.Swapped => delivery.uow.enqueue(entry.toQueueEntryRecord)
- }
- }
-
if( entry.hasSubs ) {
// try to dispatch it directly...
entry.dispatch
@@ -697,7 +690,7 @@ class Queue(val router: LocalRouter, val
if( entry.isLinked ) {
if( !consumers_keeping_up_historically ) {
entry.swap(true)
- } else if( entry.as_loaded.is_acquired && persisted) {
+ } else if( entry.as_loaded.is_acquired && queue_delivery.uow != null) {
// If the message as dispatched and it's marked to get persisted anyways,
// then it's ok if it falls out of memory since we won't need to load it again.
entry.swap(false)
@@ -705,7 +698,7 @@ class Queue(val router: LocalRouter, val
}
// release the store batch...
- if (persisted) {
+ if (queue_delivery.uow != null) {
queue_delivery.uow.release
queue_delivery.uow = null
}
@@ -726,17 +719,10 @@ class Queue(val router: LocalRouter, val
}
def expired(entry:QueueEntry, dequeue:Boolean=true):Unit = {
- if(dequeue) {
- might_unfill {
- dequeue_item_counter += 1
- dequeue_size_counter += entry.size
- dequeue_ts = now
- }
- }
-
expired_ts = now
expired_item_counter += 1
expired_size_counter += entry.size
+ entry.dequeue(null)
}
def display_stats: Unit = {
@@ -806,14 +792,12 @@ class Queue(val router: LocalRouter, val
// acquired.
if( !x.is_acquired ) {
expired(cur)
- x.remove
}
case x:QueueEntry#Loaded =>
// remove the expired message if it has not been
// acquired.
if( !x.is_acquired ) {
expired(cur)
- x.remove
}
case _ =>
}
@@ -833,36 +817,51 @@ class Queue(val router: LocalRouter, val
}
// swap out messages.
- cur = entries.getHead
+ cur = entries.getHead.getNext
+ var dropping_head_entries = is_topic_queue
while( cur!=null ) {
val next = cur.getNext
- if( cur.prefetched ) {
- // Prefteched entries need to get loaded..
- cur.load(consumer_swapped_in)
- } else {
- // This is a non-prefetched entry.. entires ahead and behind the
- // consumer subscriptions.
- val loaded = cur.as_loaded
- if( loaded!=null ) {
- // It's in memory.. perhaps we need to swap it out..
- if(!consumers_keeping_up_historically) {
- // Swap out ASAP if consumers are not keeping up..
- cur.swap(true)
+ if ( dropping_head_entries ) {
+ if( cur.parked.isEmpty ) {
+ if( cur.is_swapped_range ) {
+ cur.load(producer_swapped_in)
+ dropping_head_entries=false
} else {
- // Consumers seem to be keeping up.. so we have to be more selective
- // about what gets swapped out..
-
- if (cur.memory_space eq producer_swapped_in ) {
- // Entry will be used soon..
- cur.load(producer_swapped_in)
- } else if ( cur.is_acquired ) {
- // Entry was just used...
- cur.load(consumer_swapped_in)
-// cur.swap(false)
- } else {
- // Does not look to be anywhere close to the consumer.. so get
- // rid of it asap.
+ cur.dequeue(null)
+ }
+ } else {
+ cur.load(consumer_swapped_in)
+ dropping_head_entries = false
+ }
+ } else {
+ if( cur.prefetched ) {
+ // Prefteched entries need to get loaded..
+ cur.load(consumer_swapped_in)
+ } else {
+ // This is a non-prefetched entry.. entires ahead and behind the
+ // consumer subscriptions.
+ val loaded = cur.as_loaded
+ if( loaded!=null ) {
+ // It's in memory.. perhaps we need to swap it out..
+ if(!consumers_keeping_up_historically) {
+ // Swap out ASAP if consumers are not keeping up..
cur.swap(true)
+ } else {
+ // Consumers seem to be keeping up.. so we have to be more selective
+ // about what gets swapped out..
+
+ if (cur.memory_space eq producer_swapped_in ) {
+ // Entry will be used soon..
+ cur.load(producer_swapped_in)
+ } else if ( cur.is_acquired ) {
+ // Entry was just used...
+ cur.load(consumer_swapped_in)
+ // cur.swap(false)
+ } else {
+ // Does not look to be anywhere close to the consumer.. so get
+ // rid of it asap.
+ cur.swap(true)
+ }
}
}
}
@@ -1052,12 +1051,11 @@ class Queue(val router: LocalRouter, val
case (entry, consumed, uow) =>
consumed match {
case Consumed =>
-// debug("ack consumed: ("+store_id+","+entry.entry.seq+")")
entry.ack(uow)
case Expired=>
// debug("ack expired: ("+store_id+","+entry.entry.seq+")")
entry.entry.queue.expired(entry.entry, false)
- entry.ack(uow)
+ entry.remove()
case Delivered =>
entry.increment_nack
entry.entry.redelivered
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1379944&r1=1379943&r2=1379944&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Sun Sep 2 11:26:04 2012
@@ -64,15 +64,25 @@ class QueueEntry(val queue:Queue, val se
}
def init(delivery:Delivery):QueueEntry = {
+
if( delivery.message == null ) {
// This must be a swapped out message which has been previously persisted in
// another queue. We need to enqueue it to this queue..
queue.swap_out_size_counter += delivery.size
queue.swap_out_item_counter += 1
state = new Swapped(delivery.storeKey, delivery.storeLocator, delivery.size, delivery.expiration, 0, null, delivery.sender)
+ // store it..
+ if( delivery.uow != null ) {
+ delivery.uow.enqueue(toQueueEntryRecord)
+ }
} else {
queue.producer_swapped_in += delivery
- state = new Loaded(delivery, false, queue.producer_swapped_in)
+ val loaded: QueueEntry.this.type#Loaded = new Loaded(delivery, false, queue.producer_swapped_in)
+ state = loaded
+ // store it..
+ if( delivery.uow != null ) {
+ loaded.store
+ }
}
this
}
@@ -195,6 +205,7 @@ class QueueEntry(val queue:Queue, val se
// These methods may cause a change in the current state.
def swap(asap:Boolean) = state.swap_out(asap)
def load(space:MemorySpace) = state.swap_in(space)
+ def dequeue(uow:StoreUOW) = state.dequeue(uow)
def remove = state.remove
def swapped_range = state.swap_range
@@ -290,14 +301,48 @@ class QueueEntry(val queue:Queue, val se
* Removes the entry from the queue's linked list of entries. This gets called
* as a result of an acquired ack.
*/
- def remove:Unit = {
+ def dequeue(uow:StoreUOW):Unit = {
+ if (messageKey != -1) {
+ val localuow = if( uow == null ) {
+ queue.virtual_host.store.create_uow
+ } else {
+ uow
+ }
+ localuow.dequeue(entry.toQueueEntryRecord)
+ remove()
+ queue.in_flight_removes.add(seq)
+ localuow.on_complete {
+ queue.dispatch_queue {
+ queue.in_flight_removes.remove(seq)
+ queue.might_unfill {
+ queue.dequeue_item_counter += 1
+ queue.dequeue_size_counter += size
+ queue.dequeue_ts = queue.now
+ }
+ }
+ }
+ if( uow == null ) {
+ localuow.release
+ }
+ } else {
+ queue.might_unfill {
+ remove()
+ queue.dequeue_item_counter += 1
+ queue.dequeue_size_counter += size
+ queue.dequeue_ts = queue.now
+ }
+ }
+ }
+
+ def remove():Unit = {
// advance subscriptions that were on this entry..
- advance(parked)
- parked = Nil
+ if( !parked.isEmpty ) {
+ advance(parked)
+ parked = Nil
+ }
// take the entry of the entries list..
unlink
- //TODO: perhaps refill subscriptions.
}
/**
@@ -337,7 +382,7 @@ class QueueEntry(val queue:Queue, val se
}
}
- override def remove = throw new AssertionError("Head entry cannot be removed")
+ override def dequeue(uow:StoreUOW) = throw new AssertionError("Head entry cannot be removed")
override def swap_in(space:MemorySpace) = throw new AssertionError("Head entry cannot be loaded")
override def swap_out(asap:Boolean) = throw new AssertionError("Head entry cannot be swapped")
}
@@ -353,7 +398,7 @@ class QueueEntry(val queue:Queue, val se
override def toString = "tail"
override def as_tail:Tail = this
- override def remove = throw new AssertionError("Tail entry cannot be removed")
+ override def dequeue(uow:StoreUOW) = throw new AssertionError("Tail entry cannot be removed")
override def swap_in(space:MemorySpace) = throw new AssertionError("Tail entry cannot be loaded")
override def swap_out(asap:Boolean) = throw new AssertionError("Tail entry cannot be swapped")
@@ -401,7 +446,7 @@ class QueueEntry(val queue:Queue, val se
override def redelivered = delivery.redeliveries = ((delivery.redeliveries+1).min(Short.MaxValue)).toShort
- var remove_pending = false
+ var remove_pending:Option[StoreUOW] = None
override def is_swapped_or_swapping_out = {
swapping_out
@@ -489,11 +534,11 @@ class QueueEntry(val queue:Queue, val se
if( can_combine_with_prev ) {
getPrevious.as_swapped_range.combineNext
}
- if( remove_pending ) {
- state.remove
- } else {
- queue.loaded_items -= 1
- queue.loaded_size -= size
+ queue.loaded_items -= 1
+ queue.loaded_size -= size
+
+ if( remove_pending.isDefined ) {
+ state.dequeue(remove_pending.get)
}
val on_swap_out_copy = on_swap_out
@@ -503,10 +548,10 @@ class QueueEntry(val queue:Queue, val se
}
} else {
- if( remove_pending ) {
+ if( remove_pending.isDefined ) {
delivery.message.release
space -= delivery
- super.remove
+ super.dequeue(remove_pending.get)
}
}
}
@@ -520,25 +565,28 @@ class QueueEntry(val queue:Queue, val se
swapping_out = false
}
- override def remove = {
- queue.loaded_items -= 1
- queue.loaded_size -= size
- if( storing | remove_pending ) {
- remove_pending = true
+ override def dequeue(uow:StoreUOW) = {
+ if( storing | remove_pending.isDefined ) {
+ remove_pending = Some(uow)
} else {
- delivery.message.release
- space -= delivery
- super.remove
+ super.dequeue(uow)
}
}
+ override def remove() = {
+ queue.loaded_items -= 1
+ queue.loaded_size -= size
+ delivery.message.release
+ space -= delivery
+ super.remove()
+ }
+
override def dispatch():Boolean = {
queue.assert_executing
if( !is_acquired && expiration != 0 && expiration <= queue.now ) {
queue.expired(entry)
- remove
return true
}
@@ -647,15 +695,7 @@ class QueueEntry(val queue:Queue, val se
// We can drop after dispatch in some cases.
if( queue.is_topic_queue && parked.isEmpty && getPrevious.is_head ) {
- if (messageKey != -1) {
- val storeBatch = queue.virtual_host.store.create_uow
- storeBatch.dequeue(toQueueEntryRecord)
- storeBatch.release
- }
- queue.dequeue_item_counter += 1
- queue.dequeue_size_counter += size
- queue.dequeue_ts = queue.now
- remove
+ dequeue(null)
}
queue.trigger_swap
@@ -763,14 +803,18 @@ class QueueEntry(val queue:Queue, val se
}
}
-
- override def remove = {
+ override def dequeue(uow:StoreUOW) = {
if( space!=null ) {
space = null
queue.swapping_in_size -= size
}
+ super.dequeue(uow)
+ }
+
+
+ override def remove() {
queue.individual_swapped_items -= 1
- super.remove
+ super.remove()
}
override def swap_range = {
@@ -789,7 +833,6 @@ class QueueEntry(val queue:Queue, val se
if( !is_acquired && expiration != 0 && expiration <= queue.now ) {
queue.expired(entry)
- remove
return true
}
@@ -891,9 +934,11 @@ class QueueEntry(val queue:Queue, val se
val tmpList = new LinkedNodeList[QueueEntry]()
records.foreach { record =>
val entry = new QueueEntry(queue, record.entry_seq).init(record)
- tmpList.addLast(entry)
- item_count += 1
- size_count += record.size
+ if( !queue.in_flight_removes.contains(entry.seq) ) {
+ tmpList.addLast(entry)
+ item_count += 1
+ size_count += record.size
+ }
}
// we may need to adjust the enqueue count if entries
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1379944&r1=1379943&r2=1379944&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Sun Sep 2 11:26:04 2012
@@ -322,21 +322,11 @@ class Subscription(val queue:Queue, val
total_ack_count += 1
total_ack_size += entry.size
- if (entry.messageKey != -1) {
- val storeBatch = if( uow == null ) {
- queue.virtual_host.store.create_uow
- } else {
- uow
- }
- storeBatch.dequeue(entry.toQueueEntryRecord)
- if( uow == null ) {
- storeBatch.release
- }
- }
- queue.dequeue_item_counter += 1
- queue.dequeue_size_counter += entry.size
- queue.dequeue_ts = queue.now
+ remove()
+ entry.dequeue(uow) // entry size changes to 0
+ }
+ def remove():Unit = {
// removes this entry from the acquired list.
unlink()
if( acquired.isEmpty ) {
@@ -347,12 +337,9 @@ class Subscription(val queue:Queue, val
acquired_size -= entry.size
val next = entry.nextOrTail
- entry.remove // entry size changes to 0
-
queue.trigger_swap
next.task.run
check_finish_close
-
}
def increment_nack = total_nack_count += 1
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala?rev=1379944&r1=1379943&r2=1379944&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala Sun Sep 2 11:26:04 2012
@@ -281,3 +281,47 @@ class StompMetricsTest extends StompTest
}
}
+
+class StompLevelDBMetricsTest extends StompMetricsTest {
+
+ override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+
+ test("slow_consumer_policy='queue' /w 1 slow and 1 fast consumer.") {
+ var dest_name = next_id("queued.metrics")
+ val dest = "/topic/"+dest_name
+
+ val fast = new StompClient
+ connect("1.1", fast)
+ subscribe("fast", dest, "auto", c=fast);
+
+ val slow = new StompClient
+ connect("1.1", slow)
+ subscribe("fast", dest, "client", c=slow);
+
+ connect("1.1")
+ for( i <- 1 to 1000 ) {
+ async_send(dest, "%01204d".format(i))
+ }
+
+ for( i <- 1 to 1000 ) {
+ assert_received("%01204d".format(i),c=fast)
+ }
+
+ within(3, SECONDS) {
+ val stat = topic_status(dest_name).metrics
+ stat.queue_items should be >= (0L)
+ stat.swapped_in_items should be <= ( stat.queue_items ) // some of it swapped.
+ stat.enqueue_item_counter should be(1000L)
+ }
+
+ slow.close()
+
+ within(3, SECONDS) {
+ val stat = topic_status(dest_name).metrics
+ stat.queue_items should be (0L)
+ stat.swapped_in_items should be(0L)
+ stat.enqueue_item_counter should be(1000L)
+ }
+ }
+
+}