You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/27 22:21:12 UTC

svn commit: r1377822 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/s...

Author: chirino
Date: Mon Aug 27 20:21:12 2012
New Revision: 1377822

URL: http://svn.apache.org/viewvc?rev=1377822&view=rev
Log:
Fixes APLO-251 : Share a single queue for all consumers on topic configured with slow_consumer_policy="queue"

This should allow topics configured with slow_consumer_policy="queue" to scale a bit better as there will be fewer storage operations occurring when you have multiple consumers.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Mon Aug 27 20:21:12 2012
@@ -255,7 +255,7 @@ object TempQueueBinding {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class TempQueueBinding(topic:String, key:AnyRef, address:DestinationAddress, settings:QueueSettingsDTO) extends Binding {
+case class TempQueueBinding(topic:String, address:DestinationAddress, settings:QueueSettingsDTO) extends Binding {
   import TempQueueBinding._
 
   def binding_kind = TEMP_KIND
@@ -268,14 +268,14 @@ case class TempQueueBinding(topic:String
   def unbind(router: LocalRouter, queue: Queue) = {}
   def bind(router: LocalRouter, queue: Queue) = {}
 
-  override def hashCode = if(key==null) 0 else key.hashCode
+  override def hashCode = if(topic==null) 0 else topic.hashCode
 
   def config(host: VirtualHost) = settings
 
   override def equals(o:Any):Boolean = o match {
-    case x: TempQueueBinding => x.key == key
+    case x: TempQueueBinding => x.topic == topic
     case _ => false
   }
 
-  override def toString = super.toString+":"+key
+  override def toString = super.toString+":"+topic
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Mon Aug 27 20:21:12 2012
@@ -74,6 +74,25 @@ trait DeliveryConsumer extends Retained 
   def is_persistent:Boolean
 }
 
+class DeliveryConsumerFilter(val next:DeliveryConsumer) extends DeliveryConsumer {
+  override def browser: Boolean = next.browser
+  override def close_on_drain: Boolean = next.close_on_drain
+  override def connection: Option[BrokerConnection] = next.connection
+  override def exclusive: Boolean = next.exclusive
+  override def jms_selector: String = next.jms_selector
+  override def receive_buffer_size: Int = next.receive_buffer_size
+  override def set_starting_seq(seq: Long) { next.set_starting_seq(seq)  }
+  override def start_from_tail: Boolean = next.start_from_tail
+  override def user: String = next.user
+  def connect(producer: DeliveryProducer): DeliverySession = next.connect(producer)
+  def dispatch_queue: DispatchQueue = next.dispatch_queue
+  def is_persistent: Boolean = next.is_persistent
+  def matches(message: Delivery): Boolean = next.matches(message)
+  def release() { next.release() }
+  def retain() { next.retain() }
+  def retained(): Int = next.retained()
+}
+
 /**
  * Before a delivery producer can send Delivery objects to a delivery
  * consumer, it creates a Delivery session which it uses to send

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Aug 27 20:21:12 2012
@@ -66,6 +66,7 @@ class Queue(val router: LocalRouter, val
   val resource_kind = binding match {
     case x:DurableSubscriptionQueueBinding=> DurableSubKind
     case x:QueueDomainQueueBinding=> QueueKind
+    case x:TempQueueBinding => TopicQueueKind
     case _ => OtherKind
   }
 
@@ -551,6 +552,9 @@ class Queue(val router: LocalRouter, val
     consumer_swapped_in.size_max += amount
   }
 
+
+  def is_topic_queue = resource_kind eq TopicQueueKind
+
   object messages extends Sink[(Session[Delivery], Delivery)] {
 
     var refiller: Task = null
@@ -581,7 +585,11 @@ class Queue(val router: LocalRouter, val
         // We may need to drop this enqueue or head entries due
         // to the drop policy.
         var drop = false
-        if( full_policy ne Block ) {
+
+        if( is_topic_queue && all_subscriptions.isEmpty ) {
+          // no need to queue it..
+          drop = true
+        } else if( full_policy ne Block ) {
 
           def eval_drop(entry:QueueEntry) = entry.state match {
             case state: entry.Loaded =>
@@ -685,14 +693,17 @@ class Queue(val router: LocalRouter, val
           entry.dispatch
         }
 
-        if( !consumers_keeping_up_historically  ) {
-          entry.swap(true)
-        } else if( entry.as_loaded.is_acquired && persisted) {
-          // If the message as dispatched and it's marked to get persisted anyways,
-          // then it's ok if it falls out of memory since we won't need to load it again.
-          entry.swap(false)
+        // entry might get dispatched and removed.
+        if( entry.isLinked ) {
+          if( !consumers_keeping_up_historically  ) {
+            entry.swap(true)
+          } else if( entry.as_loaded.is_acquired && persisted) {
+            // If the message as dispatched and it's marked to get persisted anyways,
+            // then it's ok if it falls out of memory since we won't need to load it again.
+            entry.swap(false)
+          }
         }
-        
+
         // release the store batch...
         if (persisted) {
           queue_delivery.uow.release

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Mon Aug 27 20:21:12 2012
@@ -636,12 +636,19 @@ class QueueEntry(val queue:Queue, val se
         // the advancing subs move on to the next entry...
         advance(advancing)
 
-//        // swap this entry out if it's not going to be needed soon.
-//        if( !hasSubs && prefetch_flags==0 ) {
-//          // then swap out to make space...
-//          var asap = !acquired
-//          flush(asap)
-//        }
+        // We can drop after dispatch in some cases.
+        if( queue.is_topic_queue  && parked.isEmpty && getPrevious.is_head ) {
+          if (messageKey != -1) {
+            val storeBatch = queue.virtual_host.store.create_uow
+            storeBatch.dequeue(toQueueEntryRecord)
+            storeBatch.release
+          }
+          queue.dequeue_item_counter += 1
+          queue.dequeue_size_counter += size
+          queue.dequeue_ts = queue.now
+          remove
+        }
+
         queue.trigger_swap
         return true
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Mon Aug 27 20:21:12 2012
@@ -40,6 +40,7 @@ class Topic(val router:LocalRouter, val 
 
   val resource_kind =SecuredResource.TopicKind
   var proxy_sessions = new HashSet[DeliverySession]()
+  var topic_queue_consumers = new HashMap[DeliveryConsumer, DeliveryConsumer]()
 
   @transient
   var retained_message: Delivery = _
@@ -157,23 +158,15 @@ class Topic(val router:LocalRouter, val 
     }
   }
 
-  case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO, registered:DeliveryConsumer) extends DeliveryConsumer {
-
-    def retained() = consumer.retained()
-    def retain() = consumer.retain()
-    def release() = consumer.release()
-    def matches(message: Delivery) = consumer.matches(message)
-    def is_persistent = consumer.is_persistent
-    def dispatch_queue = consumer.dispatch_queue
-    def connect(producer: DeliveryProducer) = {
-      new ProxyConsumerSession(this, consumer.connect(producer))
+  case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO, registered:DeliveryConsumer) extends DeliveryConsumerFilter(consumer) {
+    override def connect(producer: DeliveryProducer) = {
+      new ProxyConsumerSession(this, next.connect(producer))
     }
   }
 
   val producers = HashMap[BindableDeliveryProducer, LinkDTO]()
   val consumers = HashMap[DeliveryConsumer, ProxyDeliveryConsumer]()
   var durable_subscriptions = ListBuffer[Queue]()
-  var consumer_queues = HashMap[DeliveryConsumer, Queue]()
   var idled_at = 0L
   val created_at = now
   var auto_delete_after = 0
@@ -240,6 +233,15 @@ class Topic(val router:LocalRouter, val 
       rc.consumers.add(o)
     }
 
+    if( topic_queue !=null ) {
+      val link = new LinkDTO()
+      link.kind = "topic-queue"
+      link.id = topic_queue.store_id.toString()
+      link.label = "shared queue"
+      link.enqueue_ts = now
+      rc.consumers.add(link)
+    }
+
     // Add in the counters from the live sessions..
     proxy_sessions.foreach{ session =>
       val stats = from_session(session)
@@ -271,6 +273,24 @@ class Topic(val router:LocalRouter, val 
 
     var futures = List[Future[(TopicStatusDTO)=>Unit]]()
 
+    if ( topic_queue!=null ) {
+      val future = Future[(TopicStatusDTO)=>Unit]()
+      futures ::= future
+      topic_queue.dispatch_queue {
+        val metrics = topic_queue.get_queue_metrics
+        metrics.enqueue_item_counter = 0
+        metrics.enqueue_size_counter = 0
+        metrics.enqueue_ts = 0
+        metrics.producer_counter = 0
+        metrics.producer_count = 0
+//        metrics.consumer_counter = 0
+//        metrics.consumer_count = 0
+        future.set((rc)=>{
+          DestinationMetricsSupport.add_destination_metrics(rc.metrics, metrics)
+        })
+      }
+    }
+
     consumers_links.foreach { case (consumer, link) =>
       consumer match {
         case queue:Queue =>
@@ -357,7 +377,30 @@ class Topic(val router:LocalRouter, val 
     }
   }
 
-  def bind(address: BindAddress, consumer:DeliveryConsumer) = {
+  var topic_queue:Queue = null
+
+  def bind(address: BindAddress, consumer:DeliveryConsumer):Unit = {
+
+    def send_retained = {
+      val r = retained_message
+      if (r != null) {
+        val copy = r.copy()
+        copy.sender ::= address
+
+        val producer = new  DeliveryProducerRoute(router) {
+          refiller = NOOP
+          val dispatch_queue = createQueue()
+          override protected def on_connected = {
+            copy.ack = (d,x) => consumer.dispatch_queue {
+              unbind(consumer :: Nil)
+            }
+            offer(copy) // producer supports 1 message overflow.
+          }
+        }
+        producer.bind(consumer :: Nil)
+        producer.connected()
+      }
+    }
 
     val target = address.domain match {
       case "queue" | "dsub"=>
@@ -368,11 +411,23 @@ class Topic(val router:LocalRouter, val 
           case "queue" =>
 
             // create a temp queue so that it can spool
-            val queue = router._create_queue(new TempQueueBinding(id, consumer, address, Option(config.subscription).getOrElse(new QueueSettingsDTO)))
-            queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
-            queue.bind(List(consumer))
-            consumer_queues += consumer->queue
-            queue
+            if ( topic_queue==null ) {
+              topic_queue = router._create_queue(new TempQueueBinding(id, Topic.this.address, Option(config.subscription).getOrElse(new QueueSettingsDTO)))
+              producers.keys.foreach({ r=>
+                r.bind(List(topic_queue))
+              })
+            }
+            val proxy = new DeliveryConsumerFilter(consumer) {
+              // Make this consumer act like a continuous queue browser
+              override def browser = true
+              override def start_from_tail = true
+              override def close_on_drain = false
+              override def exclusive = false
+            }
+            topic_queue_consumers.put(consumer, proxy)
+            topic_queue.bind(List(proxy))
+            send_retained
+            return
 
           case "block" =>
             // just have dispatcher dispatch directly to them..
@@ -390,13 +445,7 @@ class Topic(val router:LocalRouter, val 
           case x:TempQueueBinding =>
             link.kind = "topic-queue"
             link.id = queue.store_id.toString()
-            x.key match {
-              case target:DeliveryConsumer=>
-                for(connection <- target.connection) {
-                  link.label = connection.transport.getRemoteAddress.toString
-                }
-              case _ =>
-            }
+            link.label = "shared queue"
           case x:QueueDomainQueueBinding =>
             link.kind = "queue"
             link.id = queue.id
@@ -414,25 +463,7 @@ class Topic(val router:LocalRouter, val 
         }
     }
 
-    val r = retained_message
-    if (r != null) {
-      val copy = r.copy()
-      copy.sender ::= address
-
-      val producer = new  DeliveryProducerRoute(router) {
-        refiller = NOOP
-        val dispatch_queue = createQueue()
-        override protected def on_connected = {
-          copy.ack = (d,x) => consumer.dispatch_queue {
-            unbind(consumer :: Nil)
-          }
-          offer(copy) // producer supports 1 message overflow.
-        }
-      }
-      producer.bind(consumer :: Nil)
-      producer.connected()
-    }
-    
+    send_retained
     val proxy = ProxyDeliveryConsumer(target, link, consumer)
     consumers.put(consumer, proxy)
     topic_metrics.consumer_counter += 1
@@ -444,39 +475,47 @@ class Topic(val router:LocalRouter, val 
   }
 
   def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
+    val list = topic_queue_consumers.remove(consumer) match {
+      case Some(consumer)=>
+        topic_queue.unbind(List(consumer))
+
+        // Once we don't have any subscribers.. delete the queue.
+        if( topic_queue_consumers.isEmpty ) {
+          val queue = topic_queue
+          topic_queue = null
 
-    for(proxy <- consumers.remove(consumer)) {
-      val list = consumer_queues.remove(consumer) match {
-        case Some(queue) =>
-          queue.unbind(List(consumer))
-          queue.binding match {
-            case x:TempQueueBinding =>
-              queue.dispatch_queue {
-                val metrics = queue.get_queue_metrics
-                router.dispatch_queue {
-                  router._destroy_queue(queue)
-                }
-                dispatch_queue {
-                  topic_metrics.dequeue_item_counter += metrics.dequeue_item_counter
-                  topic_metrics.dequeue_size_counter += metrics.dequeue_size_counter
-                  topic_metrics.dequeue_ts = topic_metrics.dequeue_ts max metrics.dequeue_ts
-                  topic_metrics.nack_item_counter += metrics.nack_item_counter
-                  topic_metrics.nack_size_counter += metrics.nack_size_counter
-                  topic_metrics.nack_ts  = topic_metrics.nack_ts max metrics.nack_ts
-                  topic_metrics.expired_item_counter += metrics.expired_item_counter
-                  topic_metrics.expired_size_counter += metrics.expired_size_counter
-                  topic_metrics.expired_ts  = topic_metrics.expired_ts max metrics.expired_ts
-                }
+          queue.dispatch_queue {
+            if( queue.all_subscriptions.isEmpty ) {
+              val metrics = queue.get_queue_metrics
+              router.dispatch_queue {
+                router._destroy_queue(queue)
               }
+              dispatch_queue {
+                topic_metrics.dequeue_item_counter += metrics.dequeue_item_counter
+                topic_metrics.dequeue_size_counter += metrics.dequeue_size_counter
+                topic_metrics.dequeue_ts = topic_metrics.dequeue_ts max metrics.dequeue_ts
+                topic_metrics.nack_item_counter += metrics.nack_item_counter
+                topic_metrics.nack_size_counter += metrics.nack_size_counter
+                topic_metrics.nack_ts  = topic_metrics.nack_ts max metrics.nack_ts
+                topic_metrics.expired_item_counter += metrics.expired_item_counter
+                topic_metrics.expired_size_counter += metrics.expired_size_counter
+                topic_metrics.expired_ts  = topic_metrics.expired_ts max metrics.expired_ts
+              }
+            }
           }
-          List(queue)
-        case None =>
-          add_dequeue_counters(topic_metrics, proxy.link)
-          List(consumer)
-      }
-      producers.keys.foreach({ r=>
-        r.unbind(list)
-      })
+        }
+        List()
+      case None =>
+        consumers.remove(consumer) match {
+          case Some(consumer)=>
+            add_dequeue_counters(topic_metrics, consumer.link)
+            List(consumer)
+          case None =>
+            List()
+        }
+    }
+    for( producer <- producers.keys ) {
+     producer.unbind(list)
     }
     check_idle
   }
@@ -510,7 +549,11 @@ class Topic(val router:LocalRouter, val 
     }
     producers.put(producer, link)
     topic_metrics.producer_counter += 1
-    producer.bind(producer_tracker::consumers.values.toList )
+    var targets:List[DeliveryConsumer] = producer_tracker :: consumers.values.toList
+    if( topic_queue !=null ) {
+      targets ::= topic_queue
+    }
+    producer.bind(targets )
     check_idle
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authorizer.scala Mon Aug 27 20:21:12 2012
@@ -57,6 +57,10 @@ object SecuredResource {
     val id = "topic"
     val actions = Set(ADMIN, MONITOR, CONFIG, CREATE, DESTROY, SEND, RECEIVE)
   }
+  object TopicQueueKind extends ResourceKind{
+    val id = "topic-queue"
+    val actions = Set(ADMIN, MONITOR, CONFIG, CREATE, DESTROY, SEND, RECEIVE)
+  }
   object QueueKind extends ResourceKind{
     val id = "queue"
     val actions = Set(ADMIN, MONITOR, CONFIG, CREATE, DESTROY, SEND, RECEIVE, CONSUME)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala?rev=1377822&r1=1377821&r2=1377822&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala Mon Aug 27 20:21:12 2012
@@ -94,8 +94,8 @@ class StompMetricsTest extends StompTest
     stat2.producer_count should be(stat1.producer_count)
     stat2.consumer_count should be(stat1.consumer_count)
     stat2.enqueue_item_counter should be(stat1.enqueue_item_counter + 2)
-    stat2.dequeue_item_counter should be(stat1.dequeue_item_counter + 0)
-    stat2.queue_items should be(stat1.queue_items + 2)
+    stat2.dequeue_item_counter should be(stat1.dequeue_item_counter + 2)
+    stat2.queue_items should be(stat1.queue_items)
 
     // Close the subscription.
     unsubscribe("0")
@@ -105,8 +105,8 @@ class StompMetricsTest extends StompTest
       stat3.producer_count should be(stat1.producer_count)
       stat3.consumer_count should be(stat1.consumer_count - 1)
       stat3.enqueue_item_counter should be(stat1.enqueue_item_counter + 2)
-      stat3.dequeue_item_counter should be(stat1.dequeue_item_counter + 0)
-      stat3.queue_items should be(stat1.queue_items - 1)
+      stat3.dequeue_item_counter should be(stat1.dequeue_item_counter + 2)
+      stat3.queue_items should be(stat1.queue_items)
     }
   }
 
@@ -184,8 +184,8 @@ class StompMetricsTest extends StompTest
     stat2.consumers.size() should be(1)
     stat2.dsubs.size() should be(0)
     stat2.metrics.enqueue_item_counter should be(3)
-    stat2.metrics.dequeue_item_counter should be(0)
-    stat2.metrics.queue_items should be(2)
+    stat2.metrics.dequeue_item_counter should be(2)
+    stat2.metrics.queue_items should be(0)
 
     // Ack now..
     ack2(true);