You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/27 22:21:12 UTC
svn commit: r1377822 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/s...
Author: chirino
Date: Mon Aug 27 20:21:12 2012
New Revision: 1377822
URL: http://svn.apache.org/viewvc?rev=1377822&view=rev
Log:
Fixes APLO-251 : Share a single queue for all consumers on topic configured with slow_consumer_policy="queue"
This should allow topics configured with slow_consumer_policy="queue" to scale a bit better as there will be fewer storage operations occurring when you have multiple consumers.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Mon Aug 27 20:21:12 2012
@@ -255,7 +255,7 @@ object TempQueueBinding {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-case class TempQueueBinding(topic:String, key:AnyRef, address:DestinationAddress, settings:QueueSettingsDTO) extends Binding {
+case class TempQueueBinding(topic:String, address:DestinationAddress, settings:QueueSettingsDTO) extends Binding {
import TempQueueBinding._
def binding_kind = TEMP_KIND
@@ -268,14 +268,14 @@ case class TempQueueBinding(topic:String
def unbind(router: LocalRouter, queue: Queue) = {}
def bind(router: LocalRouter, queue: Queue) = {}
- override def hashCode = if(key==null) 0 else key.hashCode
+ override def hashCode = if(topic==null) 0 else topic.hashCode
def config(host: VirtualHost) = settings
override def equals(o:Any):Boolean = o match {
- case x: TempQueueBinding => x.key == key
+ case x: TempQueueBinding => x.topic == topic
case _ => false
}
- override def toString = super.toString+":"+key
+ override def toString = super.toString+":"+topic
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Mon Aug 27 20:21:12 2012
@@ -74,6 +74,25 @@ trait DeliveryConsumer extends Retained
def is_persistent:Boolean
}
+class DeliveryConsumerFilter(val next:DeliveryConsumer) extends DeliveryConsumer {
+ override def browser: Boolean = next.browser
+ override def close_on_drain: Boolean = next.close_on_drain
+ override def connection: Option[BrokerConnection] = next.connection
+ override def exclusive: Boolean = next.exclusive
+ override def jms_selector: String = next.jms_selector
+ override def receive_buffer_size: Int = next.receive_buffer_size
+ override def set_starting_seq(seq: Long) { next.set_starting_seq(seq) }
+ override def start_from_tail: Boolean = next.start_from_tail
+ override def user: String = next.user
+ def connect(producer: DeliveryProducer): DeliverySession = next.connect(producer)
+ def dispatch_queue: DispatchQueue = next.dispatch_queue
+ def is_persistent: Boolean = next.is_persistent
+ def matches(message: Delivery): Boolean = next.matches(message)
+ def release() { next.release() }
+ def retain() { next.retain() }
+ def retained(): Int = next.retained()
+}
+
/**
* Before a delivery producer can send Delivery objects to a delivery
* consumer, it creates a Delivery session which it uses to send
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Aug 27 20:21:12 2012
@@ -66,6 +66,7 @@ class Queue(val router: LocalRouter, val
val resource_kind = binding match {
case x:DurableSubscriptionQueueBinding=> DurableSubKind
case x:QueueDomainQueueBinding=> QueueKind
+ case x:TempQueueBinding => TopicQueueKind
case _ => OtherKind
}
@@ -551,6 +552,9 @@ class Queue(val router: LocalRouter, val
consumer_swapped_in.size_max += amount
}
+
+ def is_topic_queue = resource_kind eq TopicQueueKind
+
object messages extends Sink[(Session[Delivery], Delivery)] {
var refiller: Task = null
@@ -581,7 +585,11 @@ class Queue(val router: LocalRouter, val
// We may need to drop this enqueue or head entries due
// to the drop policy.
var drop = false
- if( full_policy ne Block ) {
+
+ if( is_topic_queue && all_subscriptions.isEmpty ) {
+ // no need to queue it..
+ drop = true
+ } else if( full_policy ne Block ) {
def eval_drop(entry:QueueEntry) = entry.state match {
case state: entry.Loaded =>
@@ -685,14 +693,17 @@ class Queue(val router: LocalRouter, val
entry.dispatch
}
- if( !consumers_keeping_up_historically ) {
- entry.swap(true)
- } else if( entry.as_loaded.is_acquired && persisted) {
- // If the message as dispatched and it's marked to get persisted anyways,
- // then it's ok if it falls out of memory since we won't need to load it again.
- entry.swap(false)
+ // entry might get dispatched and removed.
+ if( entry.isLinked ) {
+ if( !consumers_keeping_up_historically ) {
+ entry.swap(true)
+ } else if( entry.as_loaded.is_acquired && persisted) {
+ // If the message as dispatched and it's marked to get persisted anyways,
+ // then it's ok if it falls out of memory since we won't need to load it again.
+ entry.swap(false)
+ }
}
-
+
// release the store batch...
if (persisted) {
queue_delivery.uow.release
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Mon Aug 27 20:21:12 2012
@@ -636,12 +636,19 @@ class QueueEntry(val queue:Queue, val se
// the advancing subs move on to the next entry...
advance(advancing)
-// // swap this entry out if it's not going to be needed soon.
-// if( !hasSubs && prefetch_flags==0 ) {
-// // then swap out to make space...
-// var asap = !acquired
-// flush(asap)
-// }
+ // We can drop after dispatch in some cases.
+ if( queue.is_topic_queue && parked.isEmpty && getPrevious.is_head ) {
+ if (messageKey != -1) {
+ val storeBatch = queue.virtual_host.store.create_uow
+ storeBatch.dequeue(toQueueEntryRecord)
+ storeBatch.release
+ }
+ queue.dequeue_item_counter += 1
+ queue.dequeue_size_counter += size
+ queue.dequeue_ts = queue.now
+ remove
+ }
+
queue.trigger_swap
return true
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Mon Aug 27 20:21:12 2012
@@ -40,6 +40,7 @@ class Topic(val router:LocalRouter, val
val resource_kind =SecuredResource.TopicKind
var proxy_sessions = new HashSet[DeliverySession]()
+ var topic_queue_consumers = new HashMap[DeliveryConsumer, DeliveryConsumer]()
@transient
var retained_message: Delivery = _
@@ -157,23 +158,15 @@ class Topic(val router:LocalRouter, val
}
}
- case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO, registered:DeliveryConsumer) extends DeliveryConsumer {
-
- def retained() = consumer.retained()
- def retain() = consumer.retain()
- def release() = consumer.release()
- def matches(message: Delivery) = consumer.matches(message)
- def is_persistent = consumer.is_persistent
- def dispatch_queue = consumer.dispatch_queue
- def connect(producer: DeliveryProducer) = {
- new ProxyConsumerSession(this, consumer.connect(producer))
+ case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO, registered:DeliveryConsumer) extends DeliveryConsumerFilter(consumer) {
+ override def connect(producer: DeliveryProducer) = {
+ new ProxyConsumerSession(this, next.connect(producer))
}
}
val producers = HashMap[BindableDeliveryProducer, LinkDTO]()
val consumers = HashMap[DeliveryConsumer, ProxyDeliveryConsumer]()
var durable_subscriptions = ListBuffer[Queue]()
- var consumer_queues = HashMap[DeliveryConsumer, Queue]()
var idled_at = 0L
val created_at = now
var auto_delete_after = 0
@@ -240,6 +233,15 @@ class Topic(val router:LocalRouter, val
rc.consumers.add(o)
}
+ if( topic_queue !=null ) {
+ val link = new LinkDTO()
+ link.kind = "topic-queue"
+ link.id = topic_queue.store_id.toString()
+ link.label = "shared queue"
+ link.enqueue_ts = now
+ rc.consumers.add(link)
+ }
+
// Add in the counters from the live sessions..
proxy_sessions.foreach{ session =>
val stats = from_session(session)
@@ -271,6 +273,24 @@ class Topic(val router:LocalRouter, val
var futures = List[Future[(TopicStatusDTO)=>Unit]]()
+ if ( topic_queue!=null ) {
+ val future = Future[(TopicStatusDTO)=>Unit]()
+ futures ::= future
+ topic_queue.dispatch_queue {
+ val metrics = topic_queue.get_queue_metrics
+ metrics.enqueue_item_counter = 0
+ metrics.enqueue_size_counter = 0
+ metrics.enqueue_ts = 0
+ metrics.producer_counter = 0
+ metrics.producer_count = 0
+// metrics.consumer_counter = 0
+// metrics.consumer_count = 0
+ future.set((rc)=>{
+ DestinationMetricsSupport.add_destination_metrics(rc.metrics, metrics)
+ })
+ }
+ }
+
consumers_links.foreach { case (consumer, link) =>
consumer match {
case queue:Queue =>
@@ -357,7 +377,30 @@ class Topic(val router:LocalRouter, val
}
}
- def bind(address: BindAddress, consumer:DeliveryConsumer) = {
+ var topic_queue:Queue = null
+
+ def bind(address: BindAddress, consumer:DeliveryConsumer):Unit = {
+
+ def send_retained = {
+ val r = retained_message
+ if (r != null) {
+ val copy = r.copy()
+ copy.sender ::= address
+
+ val producer = new DeliveryProducerRoute(router) {
+ refiller = NOOP
+ val dispatch_queue = createQueue()
+ override protected def on_connected = {
+ copy.ack = (d,x) => consumer.dispatch_queue {
+ unbind(consumer :: Nil)
+ }
+ offer(copy) // producer supports 1 message overflow.
+ }
+ }
+ producer.bind(consumer :: Nil)
+ producer.connected()
+ }
+ }
val target = address.domain match {
case "queue" | "dsub"=>
@@ -368,11 +411,23 @@ class Topic(val router:LocalRouter, val
case "queue" =>
// create a temp queue so that it can spool
- val queue = router._create_queue(new TempQueueBinding(id, consumer, address, Option(config.subscription).getOrElse(new QueueSettingsDTO)))
- queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
- queue.bind(List(consumer))
- consumer_queues += consumer->queue
- queue
+ if ( topic_queue==null ) {
+ topic_queue = router._create_queue(new TempQueueBinding(id, Topic.this.address, Option(config.subscription).getOrElse(new QueueSettingsDTO)))
+ producers.keys.foreach({ r=>
+ r.bind(List(topic_queue))
+ })
+ }
+ val proxy = new DeliveryConsumerFilter(consumer) {
+ // Make this consumer act like a continuous queue browser
+ override def browser = true
+ override def start_from_tail = true
+ override def close_on_drain = false
+ override def exclusive = false
+ }
+ topic_queue_consumers.put(consumer, proxy)
+ topic_queue.bind(List(proxy))
+ send_retained
+ return
case "block" =>
// just have dispatcher dispatch directly to them..
@@ -390,13 +445,7 @@ class Topic(val router:LocalRouter, val
case x:TempQueueBinding =>
link.kind = "topic-queue"
link.id = queue.store_id.toString()
- x.key match {
- case target:DeliveryConsumer=>
- for(connection <- target.connection) {
- link.label = connection.transport.getRemoteAddress.toString
- }
- case _ =>
- }
+ link.label = "shared queue"
case x:QueueDomainQueueBinding =>
link.kind = "queue"
link.id = queue.id
@@ -414,25 +463,7 @@ class Topic(val router:LocalRouter, val
}
}
- val r = retained_message
- if (r != null) {
- val copy = r.copy()
- copy.sender ::= address
-
- val producer = new DeliveryProducerRoute(router) {
- refiller = NOOP
- val dispatch_queue = createQueue()
- override protected def on_connected = {
- copy.ack = (d,x) => consumer.dispatch_queue {
- unbind(consumer :: Nil)
- }
- offer(copy) // producer supports 1 message overflow.
- }
- }
- producer.bind(consumer :: Nil)
- producer.connected()
- }
-
+ send_retained
val proxy = ProxyDeliveryConsumer(target, link, consumer)
consumers.put(consumer, proxy)
topic_metrics.consumer_counter += 1
@@ -444,39 +475,47 @@ class Topic(val router:LocalRouter, val
}
def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
+ val list = topic_queue_consumers.remove(consumer) match {
+ case Some(consumer)=>
+ topic_queue.unbind(List(consumer))
+
+ // Once we don't have any subscribers.. delete the queue.
+ if( topic_queue_consumers.isEmpty ) {
+ val queue = topic_queue
+ topic_queue = null
- for(proxy <- consumers.remove(consumer)) {
- val list = consumer_queues.remove(consumer) match {
- case Some(queue) =>
- queue.unbind(List(consumer))
- queue.binding match {
- case x:TempQueueBinding =>
- queue.dispatch_queue {
- val metrics = queue.get_queue_metrics
- router.dispatch_queue {
- router._destroy_queue(queue)
- }
- dispatch_queue {
- topic_metrics.dequeue_item_counter += metrics.dequeue_item_counter
- topic_metrics.dequeue_size_counter += metrics.dequeue_size_counter
- topic_metrics.dequeue_ts = topic_metrics.dequeue_ts max metrics.dequeue_ts
- topic_metrics.nack_item_counter += metrics.nack_item_counter
- topic_metrics.nack_size_counter += metrics.nack_size_counter
- topic_metrics.nack_ts = topic_metrics.nack_ts max metrics.nack_ts
- topic_metrics.expired_item_counter += metrics.expired_item_counter
- topic_metrics.expired_size_counter += metrics.expired_size_counter
- topic_metrics.expired_ts = topic_metrics.expired_ts max metrics.expired_ts
- }
+ queue.dispatch_queue {
+ if( queue.all_subscriptions.isEmpty ) {
+ val metrics = queue.get_queue_metrics
+ router.dispatch_queue {
+ router._destroy_queue(queue)
}
+ dispatch_queue {
+ topic_metrics.dequeue_item_counter += metrics.dequeue_item_counter
+ topic_metrics.dequeue_size_counter += metrics.dequeue_size_counter
+ topic_metrics.dequeue_ts = topic_metrics.dequeue_ts max metrics.dequeue_ts
+ topic_metrics.nack_item_counter += metrics.nack_item_counter
+ topic_metrics.nack_size_counter += metrics.nack_size_counter
+ topic_metrics.nack_ts = topic_metrics.nack_ts max metrics.nack_ts
+ topic_metrics.expired_item_counter += metrics.expired_item_counter
+ topic_metrics.expired_size_counter += metrics.expired_size_counter
+ topic_metrics.expired_ts = topic_metrics.expired_ts max metrics.expired_ts
+ }
+ }
}
- List(queue)
- case None =>
- add_dequeue_counters(topic_metrics, proxy.link)
- List(consumer)
- }
- producers.keys.foreach({ r=>
- r.unbind(list)
- })
+ }
+ List()
+ case None =>
+ consumers.remove(consumer) match {
+ case Some(consumer)=>
+ add_dequeue_counters(topic_metrics, consumer.link)
+ List(consumer)
+ case None =>
+ List()
+ }
+ }
+ for( producer <- producers.keys ) {
+ producer.unbind(list)
}
check_idle
}
@@ -510,7 +549,11 @@ class Topic(val router:LocalRouter, val
}
producers.put(producer, link)
topic_metrics.producer_counter += 1
- producer.bind(producer_tracker::consumers.values.toList )
+ var targets:List[DeliveryConsumer] = producer_tracker :: consumers.values.toList
+ if( topic_queue !=null ) {
+ targets ::= topic_queue
+ }
+ producer.bind(targets )
check_idle
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala Mon Aug 27 20:21:12 2012
@@ -57,6 +57,10 @@ object SecuredResource {
val id = "topic"
val actions = Set(ADMIN, MONITOR, CONFIG, CREATE, DESTROY, SEND, RECEIVE)
}
+ object TopicQueueKind extends ResourceKind{
+ val id = "topic-queue"
+ val actions = Set(ADMIN, MONITOR, CONFIG, CREATE, DESTROY, SEND, RECEIVE)
+ }
object QueueKind extends ResourceKind{
val id = "queue"
val actions = Set(ADMIN, MONITOR, CONFIG, CREATE, DESTROY, SEND, RECEIVE, CONSUME)
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala Mon Aug 27 20:21:12 2012
@@ -94,8 +94,8 @@ class StompMetricsTest extends StompTest
stat2.producer_count should be(stat1.producer_count)
stat2.consumer_count should be(stat1.consumer_count)
stat2.enqueue_item_counter should be(stat1.enqueue_item_counter + 2)
- stat2.dequeue_item_counter should be(stat1.dequeue_item_counter + 0)
- stat2.queue_items should be(stat1.queue_items + 2)
+ stat2.dequeue_item_counter should be(stat1.dequeue_item_counter + 2)
+ stat2.queue_items should be(stat1.queue_items)
// Close the subscription.
unsubscribe("0")
@@ -105,8 +105,8 @@ class StompMetricsTest extends StompTest
stat3.producer_count should be(stat1.producer_count)
stat3.consumer_count should be(stat1.consumer_count - 1)
stat3.enqueue_item_counter should be(stat1.enqueue_item_counter + 2)
- stat3.dequeue_item_counter should be(stat1.dequeue_item_counter + 0)
- stat3.queue_items should be(stat1.queue_items - 1)
+ stat3.dequeue_item_counter should be(stat1.dequeue_item_counter + 2)
+ stat3.queue_items should be(stat1.queue_items)
}
}
@@ -184,8 +184,8 @@ class StompMetricsTest extends StompTest
stat2.consumers.size() should be(1)
stat2.dsubs.size() should be(0)
stat2.metrics.enqueue_item_counter should be(3)
- stat2.metrics.dequeue_item_counter should be(0)
- stat2.metrics.queue_items should be(2)
+ stat2.metrics.dequeue_item_counter should be(2)
+ stat2.metrics.queue_items should be(0)
// Ack now..
ack2(true);