You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/26 17:18:36 UTC

svn commit: r1162139 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/ap...

Author: chirino
Date: Fri Aug 26 15:18:35 2011
New Revision: 1162139

URL: http://svn.apache.org/viewvc?rev=1162139&view=rev
Log:
Fix NPE that occurred when you used queues to spool slow consumers on topics, also setup a proper link to that queue via the REST API.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index Fri Aug 26 15:18:35 2011
@@ -16,4 +16,3 @@
 ## ---------------------------------------------------------------------------
 org.apache.activemq.apollo.broker.QueueDomainQueueBinding
 org.apache.activemq.apollo.broker.DurableSubscriptionQueueBinding
-org.apache.activemq.apollo.broker.TempQueueBinding
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Fri Aug 26 15:18:35 2011
@@ -250,20 +250,19 @@ class DurableSubscriptionQueueBinding(va
 }
 
 
-object TempQueueBinding extends BindingFactory {
+object TempQueueBinding {
   val TEMP_DATA = new AsciiBuffer("")
   val TEMP_KIND = new AsciiBuffer("tmp")
-  val TEMP_DTO = null
 
-  def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
-    if( binding_kind == TEMP_KIND ) {
-      new TempQueueBinding("", "")
-    } else {
-      null
-    }
-  }
-
-  def create(binding_dto:DestinationDTO) = throw new UnsupportedOperationException
+//  def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
+//    if( binding_kind == TEMP_KIND ) {
+//      new TempQueueBinding("", "")
+//    } else {
+//      null
+//    }
+//  }
+//
+//  def create(binding_dto:DestinationDTO) = throw new UnsupportedOperationException
 }
 
 /**
@@ -272,28 +271,24 @@ object TempQueueBinding extends BindingF
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class TempQueueBinding(val key:AnyRef, val id:String) extends Binding {
+class TempQueueBinding(val key:AnyRef, val destination:Path, val binding_dto:DestinationDTO) extends Binding {
   import TempQueueBinding._
 
-  def this(c:DeliveryConsumer) = this(c, c.connection.map(_.transport.getRemoteAddress.toString).getOrElse("known") )
-
-  val destination = null
   def binding_kind = TEMP_KIND
-  def binding_dto = TEMP_DTO
   def binding_data = TEMP_DATA
 
-  def unbind(router: LocalRouter, queue: Queue) = {
-  }
-
-  def bind(router: LocalRouter, queue: Queue) = {
-  }
+  def unbind(router: LocalRouter, queue: Queue) = {}
+  def bind(router: LocalRouter, queue: Queue) = {}
 
   override def hashCode = if(key==null) 0 else key.hashCode
 
+  def id = key.toString
+
+  def config(host: VirtualHost) = new QueueDTO
+
   override def equals(o:Any):Boolean = o match {
     case x: TempQueueBinding => x.key == key
     case _ => false
   }
 
-  def config(host: VirtualHost) = new QueueDTO
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Fri Aug 26 15:18:35 2011
@@ -375,7 +375,7 @@ class LocalRouter(val virtual_host:Virtu
         durable_subscriptions_by_path.remove(path, queue)
         var matches = get_destination_matches(path)
         matches.foreach( _.unbind_durable_subscription(destination, queue) )
-        _destroy_queue(queue.id, null)
+        _destroy_queue(queue)
       }
     }
 
@@ -457,7 +457,7 @@ class LocalRouter(val virtual_host:Virtu
         for( queue <- dest.durable_subscriptions ) {
           // we delete the durable sub if it's not wildcard'ed
           if( !PathParser.containsWildCards(queue.binding.destination) ) {
-            _destroy_queue(queue.id, null)
+            _destroy_queue(queue)
           }
         }
 
@@ -465,7 +465,7 @@ class LocalRouter(val virtual_host:Virtu
           consumer match {
             case queue:Queue =>
               // Delete any attached queue consumers..
-              _destroy_queue(queue.id, null)
+              _destroy_queue(queue)
           }
         }
 
@@ -928,7 +928,7 @@ class LocalRouter(val virtual_host:Virtu
 
   protected def _stop(on_completed: Runnable) = {
 //    val tracker = new LoggingTracker("router shutdown", virtual_host.console_log, dispatch_queue)
-    queues_by_id.valuesIterator.foreach { queue=>
+    queues_by_store_id.valuesIterator.foreach { queue=>
       queue.stop
 //      tracker.stop(queue)
     }
@@ -1118,7 +1118,7 @@ class LocalRouter(val virtual_host:Virtu
   /////////////////////////////////////////////////////////////////////////////
 
   var queues_by_binding = LinkedHashMap[Binding, Queue]()
-  var queues_by_id = LinkedHashMap[String, Queue]()
+  var queues_by_store_id = LinkedHashMap[Long, Queue]()
 
   /**
    * Gets an existing queue.
@@ -1130,8 +1130,8 @@ class LocalRouter(val virtual_host:Virtu
   /**
    * Gets an existing queue.
    */
-  def get_queue(id:String) = dispatch_queue ! {
-    queues_by_id.get(id)
+  def get_queue(id:Long) = dispatch_queue ! {
+    queues_by_store_id.get(id)
   }
 
 
@@ -1148,7 +1148,7 @@ class LocalRouter(val virtual_host:Virtu
 
     queue.start
     queues_by_binding.put(binding, queue)
-    queues_by_id.put(queue.id, queue)
+    queues_by_store_id.put(qid, queue)
 
     // this causes the queue to get registered in the right location in
     // the router.
@@ -1159,10 +1159,10 @@ class LocalRouter(val virtual_host:Virtu
   /**
    * Returns true if the queue no longer exists.
    */
-  def destroy_queue(id:String, security:SecurityContext) = dispatch_queue ! { _destroy_queue(id,security) }
+  def destroy_queue(id:Long, security:SecurityContext) = dispatch_queue ! { _destroy_queue(id,security) }
 
-  def _destroy_queue(id:String, security:SecurityContext):Option[String] = {
-    queues_by_id.get(id) match {
+  def _destroy_queue(id:Long, security:SecurityContext):Option[String] = {
+    queues_by_store_id.get(id) match {
       case Some(queue) =>
         _destroy_queue(queue,security)
       case None =>
@@ -1201,7 +1201,7 @@ class LocalRouter(val virtual_host:Virtu
 
       queue.binding.unbind(this, queue)
       queues_by_binding.remove(queue.binding)
-      queues_by_id.remove(queue.id)
+      queues_by_store_id.remove(queue.store_id)
       if (queue.tune_persistent) {
         queue.dispatch_queue {
           virtual_host.store.remove_queue(queue.store_id) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Aug 26 15:18:35 2011
@@ -46,7 +46,6 @@ import Queue._
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class Queue(val router: LocalRouter, val store_id:Long, var binding:Binding, var config:QueueDTO) extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService with DomainDestination with Dispatched {
-
   def id = binding.id
 
   override def toString = binding.destination.toString

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Aug 26 15:18:35 2011
@@ -32,7 +32,7 @@ trait Router extends Service {
 
   def virtual_host:VirtualHost
 
-  def get_queue(dto:String):Option[Queue] @suspendable
+  def get_queue(dto:Long):Option[Queue] @suspendable
 
   def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext) : Option[String] @suspendable
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Fri Aug 26 15:18:35 2011
@@ -355,7 +355,7 @@ class Session[T](val producer_queue:Disp
   @volatile
   var enqueue_size_counter = 0L
   @volatile
-  var enqueue_ts = 0L
+  var enqueue_ts = mux.time_stamp
 
   // create a source to coalesce credit events back to the producer side...
   val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Fri Aug 26 15:18:35 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit
 import org.fusesource.hawtdispatch._
 import collection.mutable.{HashSet, HashMap, ListBuffer}
 import java.lang.Long
+import com.sun.jdi.connect.spi.TransportService.ListenKey
 
 /**
  * <p>
@@ -134,7 +135,7 @@ class Topic(val router:LocalRouter, val 
     rc.metrics.producer_counter = producer_counter
     rc.metrics.consumer_counter = consumer_counter
     rc.metrics.producer_count = producers.size
-    rc.metrics.consumer_counter = consumers.size
+    rc.metrics.consumer_count = consumers.size
 
     this.durable_subscriptions.foreach { q =>
       rc.dsubs.add(q.id)
@@ -247,7 +248,7 @@ class Topic(val router:LocalRouter, val 
           case "queue" =>
 
             // create a temp queue so that it can spool
-            val queue = router._create_queue(new TempQueueBinding(consumer))
+            val queue = router._create_queue(new TempQueueBinding(consumer, path, destination_dto))
             queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
             queue.bind(List(consumer))
             consumer_queues += consumer->queue
@@ -260,25 +261,37 @@ class Topic(val router:LocalRouter, val 
     }
 
     val link = new LinkDTO()
+    link.kind = "unknown"
+    link.label = "unknown"
+    link.enqueue_ts = now
     target match {
       case queue:Queue =>
-        link.kind = "queue"
-        link.id = queue.id
-        link.label = queue.id
+        queue.binding match {
+          case x:TempQueueBinding =>
+            link.kind = "topic-queue"
+            link.id = queue.store_id.toString()
+            x.key match {
+              case target:DeliveryConsumer=>
+                for(connection <- target.connection) {
+                  link.label = connection.transport.getRemoteAddress.toString
+                }
+              case _ =>
+            }
+          case x:QueueDomainQueueBinding =>
+            link.kind = "queue"
+            link.id = queue.id
+            link.label = queue.id
+        }
       case _ =>
-        target.connection match {
-          case Some(connection) =>
-            link.kind = "connection"
-            link.id = connection.id.toString
-            link.label = connection.transport.getRemoteAddress.toString
-          case _ =>
-            link.kind = "unknown"
-            link.label = "unknown"
+        for(connection <- target.connection) {
+          link.kind = "connection"
+          link.id = connection.id.toString
+          link.label = connection.transport.getRemoteAddress.toString
         }
     }
 
     val proxy = ProxyDeliveryConsumer(target, link)
-    consumers.put(target, proxy)
+    consumers.put(consumer, proxy)
     consumer_counter += 1
     val list = proxy :: Nil
     producers.keys.foreach({ r=>
@@ -296,7 +309,7 @@ class Topic(val router:LocalRouter, val 
           queue.unbind(List(consumer))
           queue.binding match {
             case x:TempQueueBinding =>
-              router._destroy_queue(queue.id, null)
+              router._destroy_queue(queue)
           }
           List(queue)
         case None =>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Aug 26 15:18:35 2011
@@ -693,6 +693,11 @@ class StompProtocolHandler extends Proto
     }  catch {
       case e: Break =>
       case e:Exception =>
+        // To avoid double logging to the same log category..
+        if( connection_log!=StompProtocolHandler ) {
+          // but we also want the error on the apollo.log file.
+          warn("Internal Server Error", e)
+        }
         async_die("Internal Server Error", e);
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Fri Aug 26 15:18:35 2011
@@ -561,6 +561,18 @@ case class BrokerResource() extends Reso
     }
   }
 
+  @GET @Path("virtual-hosts/{id}/topic-queues/{name:.*}/{qid}")
+  def topic(@PathParam("id") id : String,@PathParam("name") name : String,  @PathParam("qid") qid : Long, @QueryParam("entries") entries:Boolean):QueueStatusDTO = {
+    with_virtual_host(id) { host =>
+      val router:LocalRouter = host
+      val node = router.topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+      val queue =router.queues_by_store_id.get(qid).getOrElse(result(NOT_FOUND))
+      monitoring(node) {
+        queue.status(entries)
+      }
+    }
+  }
+
   @GET @Path("virtual-hosts/{id}/queues")
   @Produces(Array("application/json"))
   def queues(@PathParam("id") id : String, @QueryParam("f") f:java.util.List[String],

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Fri Aug 26 15:18:35 2011
@@ -23,23 +23,35 @@
   - else
     - "%,.2f %%".format(n.toFloat*100.0/d)
 
-.breadcumbs
-  a(href={strip_resolve("..")+".html"}) Back
 
 - binding match
   - case x:QueueDestinationDTO =>
+    .breadcumbs
+      a(href={strip_resolve("..")+".html"}) Back
+
     h1 Queue #{id}
+    p state: #{state} #{ uptime(state_since) } ago
+    - if( state == "STARTED" )
+      form(method="post" action={path("action/delete")})
+        input(type="submit" value="delete")
   - case x:DurableSubscriptionDestinationDTO =>
+    .breadcumbs
+      a(href={strip_resolve("..")+".html"}) Back
+
     h1 Durable Subscription on #{id}
     - if( x.selector != null )
       p selector: ${x.selector}
+    p state: #{state} #{ uptime(state_since) } ago
+    - if( state == "STARTED" )
+      form(method="post" action={path("action/delete")})
+        input(type="submit" value="delete")
+
   - case _ =>
+    .breadcumbs
+      a(href={strip_resolve("../..")+".html"}) Back
     h1 Temporary Queue
+    p state: #{state} #{ uptime(state_since) } ago
 
-p state: #{state} #{ uptime(state_since) } ago
-- if( state == "STARTED" )
-  form(method="post" action={path("action/delete")})
-    input(type="submit" value="delete")
 
 h2 Current Size
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade Fri Aug 26 15:18:35 2011
@@ -44,6 +44,8 @@ ul
   - for( x <- consumers )
     li.consumer
       - x.kind match
+        - case "topic-queue" =>
+          a(href={ path("../../topic-queues/"+id+"/"+x.id+".html") }) #{x.label}
         - case "queue" =>
           a(href={ path("../../queues/"+x.id+".html") }) #{x.label}
         - case "connection" =>