You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/26 17:18:36 UTC
svn commit: r1162139 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-stomp/src/main/scala/org/apache/activemq/ap...
Author: chirino
Date: Fri Aug 26 15:18:35 2011
New Revision: 1162139
URL: http://svn.apache.org/viewvc?rev=1162139&view=rev
Log:
Fix NPE that occurred when you used queues to spool slow consumers on topics, also setup a proper link to that queue via the REST API.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index Fri Aug 26 15:18:35 2011
@@ -16,4 +16,3 @@
## ---------------------------------------------------------------------------
org.apache.activemq.apollo.broker.QueueDomainQueueBinding
org.apache.activemq.apollo.broker.DurableSubscriptionQueueBinding
-org.apache.activemq.apollo.broker.TempQueueBinding
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Fri Aug 26 15:18:35 2011
@@ -250,20 +250,19 @@ class DurableSubscriptionQueueBinding(va
}
-object TempQueueBinding extends BindingFactory {
+object TempQueueBinding {
val TEMP_DATA = new AsciiBuffer("")
val TEMP_KIND = new AsciiBuffer("tmp")
- val TEMP_DTO = null
- def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
- if( binding_kind == TEMP_KIND ) {
- new TempQueueBinding("", "")
- } else {
- null
- }
- }
-
- def create(binding_dto:DestinationDTO) = throw new UnsupportedOperationException
+// def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
+// if( binding_kind == TEMP_KIND ) {
+// new TempQueueBinding("", "")
+// } else {
+// null
+// }
+// }
+//
+// def create(binding_dto:DestinationDTO) = throw new UnsupportedOperationException
}
/**
@@ -272,28 +271,24 @@ object TempQueueBinding extends BindingF
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class TempQueueBinding(val key:AnyRef, val id:String) extends Binding {
+class TempQueueBinding(val key:AnyRef, val destination:Path, val binding_dto:DestinationDTO) extends Binding {
import TempQueueBinding._
- def this(c:DeliveryConsumer) = this(c, c.connection.map(_.transport.getRemoteAddress.toString).getOrElse("known") )
-
- val destination = null
def binding_kind = TEMP_KIND
- def binding_dto = TEMP_DTO
def binding_data = TEMP_DATA
- def unbind(router: LocalRouter, queue: Queue) = {
- }
-
- def bind(router: LocalRouter, queue: Queue) = {
- }
+ def unbind(router: LocalRouter, queue: Queue) = {}
+ def bind(router: LocalRouter, queue: Queue) = {}
override def hashCode = if(key==null) 0 else key.hashCode
+ def id = key.toString
+
+ def config(host: VirtualHost) = new QueueDTO
+
override def equals(o:Any):Boolean = o match {
case x: TempQueueBinding => x.key == key
case _ => false
}
- def config(host: VirtualHost) = new QueueDTO
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Fri Aug 26 15:18:35 2011
@@ -375,7 +375,7 @@ class LocalRouter(val virtual_host:Virtu
durable_subscriptions_by_path.remove(path, queue)
var matches = get_destination_matches(path)
matches.foreach( _.unbind_durable_subscription(destination, queue) )
- _destroy_queue(queue.id, null)
+ _destroy_queue(queue)
}
}
@@ -457,7 +457,7 @@ class LocalRouter(val virtual_host:Virtu
for( queue <- dest.durable_subscriptions ) {
// we delete the durable sub if it's not wildcard'ed
if( !PathParser.containsWildCards(queue.binding.destination) ) {
- _destroy_queue(queue.id, null)
+ _destroy_queue(queue)
}
}
@@ -465,7 +465,7 @@ class LocalRouter(val virtual_host:Virtu
consumer match {
case queue:Queue =>
// Delete any attached queue consumers..
- _destroy_queue(queue.id, null)
+ _destroy_queue(queue)
}
}
@@ -928,7 +928,7 @@ class LocalRouter(val virtual_host:Virtu
protected def _stop(on_completed: Runnable) = {
// val tracker = new LoggingTracker("router shutdown", virtual_host.console_log, dispatch_queue)
- queues_by_id.valuesIterator.foreach { queue=>
+ queues_by_store_id.valuesIterator.foreach { queue=>
queue.stop
// tracker.stop(queue)
}
@@ -1118,7 +1118,7 @@ class LocalRouter(val virtual_host:Virtu
/////////////////////////////////////////////////////////////////////////////
var queues_by_binding = LinkedHashMap[Binding, Queue]()
- var queues_by_id = LinkedHashMap[String, Queue]()
+ var queues_by_store_id = LinkedHashMap[Long, Queue]()
/**
* Gets an existing queue.
@@ -1130,8 +1130,8 @@ class LocalRouter(val virtual_host:Virtu
/**
* Gets an existing queue.
*/
- def get_queue(id:String) = dispatch_queue ! {
- queues_by_id.get(id)
+ def get_queue(id:Long) = dispatch_queue ! {
+ queues_by_store_id.get(id)
}
@@ -1148,7 +1148,7 @@ class LocalRouter(val virtual_host:Virtu
queue.start
queues_by_binding.put(binding, queue)
- queues_by_id.put(queue.id, queue)
+ queues_by_store_id.put(qid, queue)
// this causes the queue to get registered in the right location in
// the router.
@@ -1159,10 +1159,10 @@ class LocalRouter(val virtual_host:Virtu
/**
* Returns true if the queue no longer exists.
*/
- def destroy_queue(id:String, security:SecurityContext) = dispatch_queue ! { _destroy_queue(id,security) }
+ def destroy_queue(id:Long, security:SecurityContext) = dispatch_queue ! { _destroy_queue(id,security) }
- def _destroy_queue(id:String, security:SecurityContext):Option[String] = {
- queues_by_id.get(id) match {
+ def _destroy_queue(id:Long, security:SecurityContext):Option[String] = {
+ queues_by_store_id.get(id) match {
case Some(queue) =>
_destroy_queue(queue,security)
case None =>
@@ -1201,7 +1201,7 @@ class LocalRouter(val virtual_host:Virtu
queue.binding.unbind(this, queue)
queues_by_binding.remove(queue.binding)
- queues_by_id.remove(queue.id)
+ queues_by_store_id.remove(queue.store_id)
if (queue.tune_persistent) {
queue.dispatch_queue {
virtual_host.store.remove_queue(queue.store_id) {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Aug 26 15:18:35 2011
@@ -46,7 +46,6 @@ import Queue._
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class Queue(val router: LocalRouter, val store_id:Long, var binding:Binding, var config:QueueDTO) extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService with DomainDestination with Dispatched {
-
def id = binding.id
override def toString = binding.destination.toString
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Aug 26 15:18:35 2011
@@ -32,7 +32,7 @@ trait Router extends Service {
def virtual_host:VirtualHost
- def get_queue(dto:String):Option[Queue] @suspendable
+ def get_queue(dto:Long):Option[Queue] @suspendable
def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext) : Option[String] @suspendable
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Fri Aug 26 15:18:35 2011
@@ -355,7 +355,7 @@ class Session[T](val producer_queue:Disp
@volatile
var enqueue_size_counter = 0L
@volatile
- var enqueue_ts = 0L
+ var enqueue_ts = mux.time_stamp
// create a source to coalesce credit events back to the producer side...
val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Fri Aug 26 15:18:35 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit
import org.fusesource.hawtdispatch._
import collection.mutable.{HashSet, HashMap, ListBuffer}
import java.lang.Long
+import com.sun.jdi.connect.spi.TransportService.ListenKey
/**
* <p>
@@ -134,7 +135,7 @@ class Topic(val router:LocalRouter, val
rc.metrics.producer_counter = producer_counter
rc.metrics.consumer_counter = consumer_counter
rc.metrics.producer_count = producers.size
- rc.metrics.consumer_counter = consumers.size
+ rc.metrics.consumer_count = consumers.size
this.durable_subscriptions.foreach { q =>
rc.dsubs.add(q.id)
@@ -247,7 +248,7 @@ class Topic(val router:LocalRouter, val
case "queue" =>
// create a temp queue so that it can spool
- val queue = router._create_queue(new TempQueueBinding(consumer))
+ val queue = router._create_queue(new TempQueueBinding(consumer, path, destination_dto))
queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
queue.bind(List(consumer))
consumer_queues += consumer->queue
@@ -260,25 +261,37 @@ class Topic(val router:LocalRouter, val
}
val link = new LinkDTO()
+ link.kind = "unknown"
+ link.label = "unknown"
+ link.enqueue_ts = now
target match {
case queue:Queue =>
- link.kind = "queue"
- link.id = queue.id
- link.label = queue.id
+ queue.binding match {
+ case x:TempQueueBinding =>
+ link.kind = "topic-queue"
+ link.id = queue.store_id.toString()
+ x.key match {
+ case target:DeliveryConsumer=>
+ for(connection <- target.connection) {
+ link.label = connection.transport.getRemoteAddress.toString
+ }
+ case _ =>
+ }
+ case x:QueueDomainQueueBinding =>
+ link.kind = "queue"
+ link.id = queue.id
+ link.label = queue.id
+ }
case _ =>
- target.connection match {
- case Some(connection) =>
- link.kind = "connection"
- link.id = connection.id.toString
- link.label = connection.transport.getRemoteAddress.toString
- case _ =>
- link.kind = "unknown"
- link.label = "unknown"
+ for(connection <- target.connection) {
+ link.kind = "connection"
+ link.id = connection.id.toString
+ link.label = connection.transport.getRemoteAddress.toString
}
}
val proxy = ProxyDeliveryConsumer(target, link)
- consumers.put(target, proxy)
+ consumers.put(consumer, proxy)
consumer_counter += 1
val list = proxy :: Nil
producers.keys.foreach({ r=>
@@ -296,7 +309,7 @@ class Topic(val router:LocalRouter, val
queue.unbind(List(consumer))
queue.binding match {
case x:TempQueueBinding =>
- router._destroy_queue(queue.id, null)
+ router._destroy_queue(queue)
}
List(queue)
case None =>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Aug 26 15:18:35 2011
@@ -693,6 +693,11 @@ class StompProtocolHandler extends Proto
} catch {
case e: Break =>
case e:Exception =>
+ // To avoid double logging to the same log category..
+ if( connection_log!=StompProtocolHandler ) {
+ // but we also want the error on the apollo.log file.
+ warn("Internal Server Error", e)
+ }
async_die("Internal Server Error", e);
}
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Fri Aug 26 15:18:35 2011
@@ -561,6 +561,18 @@ case class BrokerResource() extends Reso
}
}
+ @GET @Path("virtual-hosts/{id}/topic-queues/{name:.*}/{qid}")
+ def topic(@PathParam("id") id : String,@PathParam("name") name : String, @PathParam("qid") qid : Long, @QueryParam("entries") entries:Boolean):QueueStatusDTO = {
+ with_virtual_host(id) { host =>
+ val router:LocalRouter = host
+ val node = router.topic_domain.destination_by_id.get(name).getOrElse(result(NOT_FOUND))
+ val queue =router.queues_by_store_id.get(qid).getOrElse(result(NOT_FOUND))
+ monitoring(node) {
+ queue.status(entries)
+ }
+ }
+ }
+
@GET @Path("virtual-hosts/{id}/queues")
@Produces(Array("application/json"))
def queues(@PathParam("id") id : String, @QueryParam("f") f:java.util.List[String],
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Fri Aug 26 15:18:35 2011
@@ -23,23 +23,35 @@
- else
- "%,.2f %%".format(n.toFloat*100.0/d)
-.breadcumbs
- a(href={strip_resolve("..")+".html"}) Back
- binding match
- case x:QueueDestinationDTO =>
+ .breadcumbs
+ a(href={strip_resolve("..")+".html"}) Back
+
h1 Queue #{id}
+ p state: #{state} #{ uptime(state_since) } ago
+ - if( state == "STARTED" )
+ form(method="post" action={path("action/delete")})
+ input(type="submit" value="delete")
- case x:DurableSubscriptionDestinationDTO =>
+ .breadcumbs
+ a(href={strip_resolve("..")+".html"}) Back
+
h1 Durable Subscription on #{id}
- if( x.selector != null )
p selector: ${x.selector}
+ p state: #{state} #{ uptime(state_since) } ago
+ - if( state == "STARTED" )
+ form(method="post" action={path("action/delete")})
+ input(type="submit" value="delete")
+
- case _ =>
+ .breadcumbs
+ a(href={strip_resolve("../..")+".html"}) Back
h1 Temporary Queue
+ p state: #{state} #{ uptime(state_since) } ago
-p state: #{state} #{ uptime(state_since) } ago
-- if( state == "STARTED" )
- form(method="post" action={path("action/delete")})
- input(type="submit" value="delete")
h2 Current Size
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade?rev=1162139&r1=1162138&r2=1162139&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/TopicStatusDTO.jade Fri Aug 26 15:18:35 2011
@@ -44,6 +44,8 @@ ul
- for( x <- consumers )
li.consumer
- x.kind match
+ - case "topic-queue" =>
+ a(href={ path("../../topic-queues/"+id+"/"+x.id+".html") }) #{x.label}
- case "queue" =>
a(href={ path("../../queues/"+x.id+".html") }) #{x.label}
- case "connection" =>