You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/08 21:17:12 UTC
svn commit: r1043655 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apoll...
Author: chirino
Date: Wed Dec 8 20:17:12 2010
New Revision: 1043655
URL: http://svn.apache.org/viewvc?rev=1043655&view=rev
Log:
- Adding initial support for using a temp queue to spool messages for slow topic consumers
- Updated admin interface so it view those temp queues too.
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java
- copied, changed from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java
- copied, changed from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java
- copied, changed from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index Wed Dec 8 20:17:12 2010
@@ -14,5 +14,6 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.PointToPointBindingFactory
-org.apache.activemq.apollo.broker.DurableSubBindingFactory
\ No newline at end of file
+org.apache.activemq.apollo.broker.QueueBindingFactory
+org.apache.activemq.apollo.broker.SubscriptionBindingFactory
+org.apache.activemq.apollo.broker.TempBindingFactory
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Wed Dec 8 20:17:12 2010
@@ -105,30 +105,30 @@ trait Binding {
def destination:Path
}
-object PointToPointBinding {
+object QueueBinding {
val POINT_TO_POINT_KIND = new AsciiBuffer("ptp")
val DESTINATION_PATH = new AsciiBuffer("default");
}
-import PointToPointBinding._
+import QueueBinding._
-class PointToPointBindingFactory extends BindingFactory.Provider {
+class QueueBindingFactory extends BindingFactory.Provider {
def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
if( binding_kind == POINT_TO_POINT_KIND ) {
- val dto = new PointToPointBindingDTO
+ val dto = new QueueBindingDTO
dto.destination = binding_data.ascii.toString
- new PointToPointBinding(binding_data, dto)
+ new QueueBinding(binding_data, dto)
} else {
null
}
}
def create(binding_dto:BindingDTO) = {
- if( binding_dto.isInstanceOf[PointToPointBindingDTO] ) {
- val ptp_dto = binding_dto.asInstanceOf[PointToPointBindingDTO]
+ if( binding_dto.isInstanceOf[QueueBindingDTO] ) {
+ val ptp_dto = binding_dto.asInstanceOf[QueueBindingDTO]
val data = new AsciiBuffer(ptp_dto.destination).buffer
- new PointToPointBinding(data, ptp_dto)
+ new QueueBinding(data, ptp_dto)
} else {
null
}
@@ -141,7 +141,7 @@ class PointToPointBindingFactory extends
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class PointToPointBinding(val binding_data:Buffer, val binding_dto:PointToPointBindingDTO) extends Binding {
+class QueueBinding(val binding_data:Buffer, val binding_dto:QueueBindingDTO) extends Binding {
val destination = DestinationParser.decode_path(binding_dto.destination)
def binding_kind = POINT_TO_POINT_KIND
@@ -163,30 +163,30 @@ class PointToPointBinding(val binding_da
override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
override def equals(o:Any):Boolean = o match {
- case x: PointToPointBinding => x.binding_data == binding_data
+ case x: QueueBinding => x.binding_data == binding_data
case _ => false
}
}
-object DurableSubBinding {
+object SubscriptionBinding {
val DURABLE_SUB_KIND = new AsciiBuffer("ds")
}
-import DurableSubBinding._
+import SubscriptionBinding._
-class DurableSubBindingFactory extends BindingFactory.Provider {
+class SubscriptionBindingFactory extends BindingFactory.Provider {
def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
if( binding_kind == DURABLE_SUB_KIND ) {
- new DurableSubBinding(binding_data, JsonCodec.decode(binding_data, classOf[DurableSubscriptionBindingDTO]))
+ new SubscriptionBinding(binding_data, JsonCodec.decode(binding_data, classOf[SubscriptionBindingDTO]))
} else {
null
}
}
def create(binding_dto:BindingDTO) = {
- if( binding_dto.isInstanceOf[DurableSubscriptionBindingDTO] ) {
- new DurableSubBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[DurableSubscriptionBindingDTO])
+ if( binding_dto.isInstanceOf[SubscriptionBindingDTO] ) {
+ new SubscriptionBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[SubscriptionBindingDTO])
} else {
null
}
@@ -200,7 +200,7 @@ class DurableSubBindingFactory extends B
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class DurableSubBinding(val binding_data:Buffer, val binding_dto:DurableSubscriptionBindingDTO) extends Binding {
+class SubscriptionBinding(val binding_data:Buffer, val binding_dto:SubscriptionBindingDTO) extends Binding {
val destination = DestinationParser.decode_path(binding_dto.destination)
@@ -229,7 +229,7 @@ class DurableSubBinding(val binding_data
override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
override def equals(o:Any):Boolean = o match {
- case x: DurableSubBinding => x.binding_data == binding_data
+ case x: SubscriptionBinding => x.binding_data == binding_data
case _ => false
}
@@ -248,4 +248,67 @@ class DurableSubBinding(val binding_data
rc = rc && (o(config.subscription_id).map{ x=> x == binding_dto.subscription_id }.getOrElse(true))
rc
}
-}
\ No newline at end of file
+}
+
+
+object TempBinding {
+ val TEMP_DATA = new AsciiBuffer("")
+ val TEMP_KIND = new AsciiBuffer("tmp")
+ val TEMP_DTO = new TempBindingDTO
+}
+
+import TempBinding._
+
+class TempBindingFactory extends BindingFactory.Provider {
+
+ def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
+ if( binding_kind == TEMP_KIND ) {
+ new TempBinding("", "")
+ } else {
+ null
+ }
+ }
+
+ def create(binding_dto:BindingDTO) = {
+ if( binding_dto.isInstanceOf[TempBindingDTO] ) {
+ new TempBinding("", "")
+ } else {
+ null
+ }
+ }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class TempBinding(val key:AnyRef, val label:String) extends Binding {
+ def this(c:DeliveryConsumer) = this(c, c.connection.map(_.transport.getRemoteAddress).getOrElse("known") )
+
+ val destination = null
+ def binding_kind = TEMP_KIND
+ def binding_dto = TEMP_DTO
+ def binding_data = TEMP_DATA
+
+ def unbind(node: RoutingNode, queue: Queue) = {
+ if( node.unified ) {
+ node.remove_broadcast_consumer(queue)
+ }
+ }
+
+ def bind(node: RoutingNode, queue: Queue) = {
+ if( node.unified ) {
+ node.add_broadcast_consumer(queue)
+ }
+ }
+
+ override def hashCode = if(key==null) 0 else key.hashCode
+
+ override def equals(o:Any):Boolean = o match {
+ case x: TempBinding => x.key == key
+ case _ => false
+ }
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Dec 8 20:17:12 2010
@@ -28,9 +28,9 @@ import collection.mutable.{ListBuffer, H
import scala.collection.immutable.List
import org.apache.activemq.apollo.store.{StoreUOW, QueueRecord}
import Buffer._
-import org.apache.activemq.apollo.dto.{QueueDTO, PointToPointBindingDTO, BindingDTO}
import org.apache.activemq.apollo.util.path.{Path, Part, PathMap, PathParser}
import java.util.ArrayList
+import org.apache.activemq.apollo.dto._
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -68,7 +68,8 @@ class Router(val host:VirtualHost) exten
protected def dispatchQueue:DispatchQueue = host.dispatchQueue
- var queues = HashMap[Binding, Queue]()
+ var queue_bindings = HashMap[Binding, Queue]()
+ var queues = HashMap[Long, Queue]()
// Only stores simple paths, used for wild card lookups.
var destinations = new PathMap[RoutingNode]()
@@ -134,9 +135,26 @@ class Router(val host:VirtualHost) exten
}
}.getOrElse(new QueueDTO)
- val queue = new Queue(host, id, binding, config)
+
+ var qid = id
+ if( qid == -1 ) {
+ qid = host.queue_id_counter.incrementAndGet
+ }
+
+ val queue = new Queue(host, qid, binding, config)
+ if( queue.tune_persistent && id == -1 ) {
+
+ val record = new QueueRecord
+ record.key = qid
+ record.binding_data = binding.binding_data
+ record.binding_kind = binding.binding_kind
+
+ host.store.addQueue(record) { rc => }
+
+ }
queue.start
- queues.put(binding, queue)
+ queue_bindings.put(binding, queue)
+ queues.put(queue.id, queue)
// Not all queues are bound to destinations.
val name = binding.destination
@@ -166,31 +184,26 @@ class Router(val host:VirtualHost) exten
*/
def _create_queue(dto: BindingDTO): Option[Queue] = {
val binding = BindingFactory.create(dto)
- val queue = queues.get(binding) match {
+ val queue = queue_bindings.get(binding) match {
case Some(queue) => Some(queue)
case None => Some(_create_queue(-1, binding))
}
queue
}
- def create_queue(dto:BindingDTO) = dispatchQueue ! {
- _create_queue(dto)
+ def create_queue(id:BindingDTO) = dispatchQueue ! {
+ _create_queue(id)
}
/**
* Returns true if the queue no longer exists.
*/
- def destroy_queue(dto:BindingDTO) = dispatchQueue ! {
- val binding = BindingFactory.create(dto)
- queues.get(binding) match {
+ def destroy_queue(dto:BindingDTO) = dispatchQueue ! { _destroy_queue(dto) }
+
+ def _destroy_queue(dto:BindingDTO):Boolean = {
+ queue_bindings.get(BindingFactory.create(dto)) match {
case Some(queue) =>
- val name = binding.destination
- if( name!=null ) {
- get_destination_matches(name).foreach( node=>
- node.remove_queue(queue)
- )
- }
- queue.stop
+ _destroy_queue(queue)
true
case None =>
true
@@ -198,11 +211,50 @@ class Router(val host:VirtualHost) exten
}
/**
+ * Returns true if the queue no longer exists.
+ */
+ def destroy_queue(id:Long) = dispatchQueue ! { _destroy_queue(id) }
+
+ def _destroy_queue(id:Long):Boolean = {
+ queues.get(id) match {
+ case Some(queue) =>
+ _destroy_queue(queue)
+ true
+ case None =>
+ true
+ }
+ }
+
+ def _destroy_queue(queue:Queue):Unit = {
+ queue_bindings.remove(queue.binding)
+ queues.remove(queue.id)
+
+ val name = queue.binding.destination
+ if( name!=null ) {
+ get_destination_matches(name).foreach( node=>
+ node.remove_queue(queue)
+ )
+ }
+ queue.stop
+ if( queue.tune_persistent ) {
+ queue.dispatchQueue ^ {
+ host.store.removeQueue(queue.id){x=>}
+ }
+ }
+ }
+
+ /**
* Gets an existing queue.
*/
def get_queue(dto:BindingDTO) = dispatchQueue ! {
- val binding = BindingFactory.create(dto)
- queues.get(binding)
+ queue_bindings.get(BindingFactory.create(dto))
+ }
+
+ /**
+ * Gets an existing queue.
+ */
+ def get_queue(id:Long) = dispatchQueue ! {
+ queues.get(id)
}
def bind(destination:Destination, consumer:DeliveryConsumer) = {
@@ -218,9 +270,9 @@ class Router(val host:VirtualHost) exten
val node = create_destination_or(name) { node=> Unit }
}
- get_destination_matches(name).foreach( node=>
+ get_destination_matches(name).foreach{ node=>
node.add_broadcast_consumer(consumer)
- )
+ }
broadcast_consumers.put(name, consumer)
}
}
@@ -249,7 +301,7 @@ class Router(val host:VirtualHost) exten
// Looking up the queue will cause it to get created if it does not exist.
val queue = if( !topic ) {
- val dto = new PointToPointBindingDTO
+ val dto = new QueueBindingDTO
dto.destination = DestinationParser.encode_path(destination.name)
_create_queue(dto)
} else {
@@ -280,11 +332,14 @@ class Router(val host:VirtualHost) exten
}
-
+object RoutingNode {
+ val DEFAULT_CONFIG = new DestinationDTO
+}
/**
* Tracks state associated with a destination name.
*/
class RoutingNode(val router:Router, val name:Path) {
+ import RoutingNode._
val id = router.destination_id_counter.incrementAndGet
@@ -292,31 +347,64 @@ class RoutingNode(val router:Router, val
var broadcast_consumers = ListBuffer[DeliveryConsumer]()
var queues = ListBuffer[Queue]()
- val unified = {
+ import OptionSupport._
+
+ val config = {
import collection.JavaConversions._
- import OptionSupport._
import DestinationParser.default._
-
- val t= router.host.config.destinations.find( x=> parseFilter(ascii(x.path)).matches(name) )
- t.flatMap(x=> o(x.unified)).getOrElse(false)
+ router.host.config.destinations.find( x=> parseFilter(ascii(x.path)).matches(name) ).getOrElse(DEFAULT_CONFIG)
}
+ def unified = config.unified.getOrElse(false)
+ def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
+
+ var consumer_proxies = Map[DeliveryConsumer, DeliveryConsumer]()
+
def add_broadcast_consumer (consumer:DeliveryConsumer) = {
- broadcast_consumers += consumer
- val list = consumer :: Nil
+ var target = consumer
+ slow_consumer_policy match {
+ case "queue" =>
+
+ // create a temp queue so that it can spool
+ val queue = router._create_queue(-1, new TempBinding(consumer))
+ queue.dispatchQueue.setTargetQueue(consumer.dispatchQueue)
+ queue.bind(List(consumer))
+
+ consumer_proxies += consumer->queue
+ target = queue
+
+ case "block" =>
+ // just have dispatcher dispatch directly to them..
+ }
+
+ broadcast_consumers += target
+ val list = target :: Nil
broadcast_producers.foreach({ r=>
r.bind(list)
})
}
def remove_broadcast_consumer (consumer:DeliveryConsumer) = {
- broadcast_consumers = broadcast_consumers.filterNot( _ == consumer )
- val list = consumer :: Nil
+ var target = consumer_proxies.get(consumer).getOrElse(consumer)
+
+ broadcast_consumers = broadcast_consumers.filterNot( _ == target )
+
+ val list = target :: Nil
broadcast_producers.foreach({ r=>
r.unbind(list)
})
+
+ target match {
+ case queue:Queue=>
+ val binding = new TempBinding(consumer)
+ if( queue.binding == binding ) {
+ queue.unbind(List(consumer))
+ router._destroy_queue(queue.id)
+ }
+ case _ =>
+ }
}
def add_broadcast_producer (producer:DeliveryProducerRoute) = {
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java Wed Dec 8 20:17:12 2010
@@ -27,7 +27,7 @@ import javax.xml.bind.annotation.*;
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@XmlType(name = "binding")
-@XmlSeeAlso({PointToPointBindingDTO.class, DurableSubscriptionBindingDTO.class})
+@XmlSeeAlso({QueueBindingDTO.class, SubscriptionBindingDTO.class})
@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
@XmlAccessorType(XmlAccessType.FIELD)
public class BindingDTO {
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Wed Dec 8 20:17:12 2010
@@ -43,4 +43,6 @@ public class DestinationDTO extends Stri
@XmlAttribute
public Boolean unified;
+ public String slow_consumer_policy;
+
}
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java (from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java&r1=1043576&r2=1043655&rev=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java Wed Dec 8 20:17:12 2010
@@ -26,9 +26,14 @@ import javax.xml.bind.annotation.*;
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlType(name = "binding")
-@XmlSeeAlso({PointToPointBindingDTO.class, DurableSubscriptionBindingDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+@XmlRootElement(name = "queue-binding")
@XmlAccessorType(XmlAccessType.FIELD)
-public class BindingDTO {
+public class QueueBindingDTO extends BindingDTO {
+
+ /**
+ * A label that describes the binding
+ */
+ @XmlAttribute
+ public String destination;
+
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Wed Dec 8 20:17:12 2010
@@ -32,8 +32,8 @@ import java.util.List;
@XmlAccessorType(XmlAccessType.FIELD)
public class QueueStatusDTO extends LongIdDTO {
- @XmlAttribute
- public String label;
+ @XmlElement
+ public BindingDTO binding;
@XmlAttribute(name="enqueue-item-counter")
public long enqueue_item_counter;
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java (from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java&r1=1043576&r2=1043655&rev=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java Wed Dec 8 20:17:12 2010
@@ -16,9 +16,10 @@
*/
package org.apache.activemq.apollo.dto;
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
-import javax.xml.bind.annotation.*;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
/**
* <p>
@@ -26,9 +27,17 @@ import javax.xml.bind.annotation.*;
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlType(name = "binding")
-@XmlSeeAlso({PointToPointBindingDTO.class, DurableSubscriptionBindingDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+@XmlRootElement(name = "subscription-binding")
@XmlAccessorType(XmlAccessType.FIELD)
-public class BindingDTO {
+public class SubscriptionBindingDTO extends BindingDTO {
+
+ public String destination;
+
+ public String filter;
+
+ @XmlAttribute(name="client-id")
+ public String client_id;
+
+ @XmlAttribute(name="subscription-id")
+ public String subscription_id;
}
\ No newline at end of file
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java (from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java&r1=1043576&r2=1043655&rev=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java Wed Dec 8 20:17:12 2010
@@ -16,9 +16,10 @@
*/
package org.apache.activemq.apollo.dto;
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
-import javax.xml.bind.annotation.*;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
/**
* <p>
@@ -26,9 +27,7 @@ import javax.xml.bind.annotation.*;
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlType(name = "binding")
-@XmlSeeAlso({PointToPointBindingDTO.class, DurableSubscriptionBindingDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+@XmlRootElement(name = "temp-binding")
@XmlAccessorType(XmlAccessType.FIELD)
-public class BindingDTO {
+public class TempBindingDTO extends BindingDTO {
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Dec 8 20:17:12 2010
@@ -36,7 +36,7 @@ import org.apache.activemq.apollo.store.
import org.apache.activemq.apollo.util._
import java.util.concurrent.TimeUnit
import java.util.Map.Entry
-import org.apache.activemq.apollo.dto.{StompConnectionStatusDTO, BindingDTO, DurableSubscriptionBindingDTO, PointToPointBindingDTO}
+import org.apache.activemq.apollo.dto.{StompConnectionStatusDTO, BindingDTO, SubscriptionBindingDTO, QueueBindingDTO}
import scala.util.continuations._
/**
@@ -460,6 +460,10 @@ class StompProtocolHandler extends Proto
connection.transport.resumeRead
}
+ def weird(headers:HeaderMap) = {
+ println("weird: "+headers)
+ }
+
def on_stomp_connect(headers:HeaderMap):Unit = {
security_context.user = get(headers, LOGIN).toString
@@ -542,7 +546,9 @@ class StompProtocolHandler extends Proto
} else {
val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
-
+ if( connection_sink==null ) {
+ weird(headers)
+ }
connection_sink.offer(
StompFrame(CONNECTED, List(
(VERSION, protocol_version),
@@ -742,7 +748,7 @@ class StompProtocolHandler extends Proto
// recover the queue on restart and rebind it the
// way again)
if (topic) {
- val rc = new DurableSubscriptionBindingDTO
+ val rc = new SubscriptionBindingDTO
rc.destination = DestinationParser.encode_path(destination.name)
// TODO:
// rc.client_id =
@@ -750,7 +756,7 @@ class StompProtocolHandler extends Proto
rc.filter = if (selector == null) null else selector._1
rc
} else {
- val rc = new PointToPointBindingDTO
+ val rc = new QueueBindingDTO
rc.destination = DestinationParser.encode_path(destination.name)
rc
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Wed Dec 8 20:17:12 2010
@@ -26,6 +26,7 @@ import collection.JavaConversions
import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.broker._
import collection.mutable.ListBuffer
+import scala.util.continuations._
/**
* <p>
@@ -144,6 +145,14 @@ case class RuntimeResource(parent:Broker
link
}
+ def link(queue:Queue) = {
+ val link = new LinkDTO()
+ link.kind = "queue"
+ link.ref = queue.id.toString
+ link.label = queue.binding.label
+ link
+ }
+
@GET @Path("virtual-hosts/{id}/destinations/{dest}")
def destination(@PathParam("id") id : Long, @PathParam("dest") dest : Long):DestinationStatusDTO = {
with_virtual_host(id) { case (virtualHost,cb) =>
@@ -154,8 +163,15 @@ case class RuntimeResource(parent:Broker
node.queues.foreach { q=>
result.queues.add(new LongIdLabeledDTO(q.id, q.binding.label))
}
- node.broadcast_consumers.flatMap( _.connection ).foreach { connection=>
- result.consumers.add(link(connection))
+ node.broadcast_consumers.foreach { consumer=>
+ consumer match {
+ case queue:Queue =>
+ result.consumers.add(link(queue))
+ case _ =>
+ consumer.connection.foreach{c=>
+ result.consumers.add(link(c))
+ }
+ }
}
node.broadcast_producers.flatMap( _.producer.connection ).foreach { connection=>
result.producers.add(link(connection))
@@ -166,81 +182,96 @@ case class RuntimeResource(parent:Broker
}
}
+ @GET @Path("virtual-hosts/{id}/queues/{queue}")
+ def queue(@PathParam("id") id : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean):QueueStatusDTO = {
+ with_virtual_host(id) { case (virtualHost,cb) =>
+ reset {
+ val queue = virtualHost.router.get_queue(qid)
+ status(queue, entries, cb)
+ }
+ }
+ }
+
@GET @Path("virtual-hosts/{id}/destinations/{dest}/queues/{queue}")
- def queue(@PathParam("id") id : Long, @PathParam("dest") dest : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
+ def destination_queue(@PathParam("id") id : Long, @PathParam("dest") dest : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
with_virtual_host(id) { case (virtualHost,cb) =>
import JavaConversions._
- (virtualHost.router.routing_nodes.find { _.id == dest } flatMap { node=> node.queues.find { _.id == qid } }) match {
- case None=> cb(None)
- case Some(q) => q.dispatchQueue {
- val result = new QueueStatusDTO
- result.id = q.id
- result.label = q.binding.label
- result.capacity_used = q.capacity_used
- result.capacity = q.capacity
-
- result.enqueue_item_counter = q.enqueue_item_counter
- result.dequeue_item_counter = q.dequeue_item_counter
- result.enqueue_size_counter = q.enqueue_size_counter
- result.dequeue_size_counter = q.dequeue_size_counter
- result.nack_item_counter = q.nack_item_counter
- result.nack_size_counter = q.nack_size_counter
-
- result.queue_size = q.queue_size
- result.queue_items = q.queue_items
-
- result.loading_size = q.loading_size
- result.flushing_size = q.flushing_size
- result.flushed_items = q.flushed_items
-
- if( entries ) {
- var cur = q.head_entry
- while( cur!=null ) {
-
- val e = new EntryStatusDTO
- e.seq = cur.seq
- e.count = cur.count
- e.size = cur.size
- e.consumer_count = cur.parked.size
- e.is_prefetched = cur.is_prefetched
- e.state = cur.label
-
- result.entries.add(e)
-
- cur = if( cur == q.tail_entry ) {
- null
- } else {
- cur.nextOrTail
- }
- }
- }
+ val queue = virtualHost.router.routing_nodes.find { _.id == dest } flatMap { node=> node.queues.find { _.id == qid } }
+ status(queue, entries, cb)
+ }
+ }
- q.inbound_sessions.flatMap( _.producer.connection ).foreach { connection=>
- result.producers.add(link(connection))
- }
- q.all_subscriptions.valuesIterator.toSeq.foreach{ sub =>
- val status = new QueueConsumerStatusDTO
- sub.consumer.connection.foreach(x=> status.link = link(x))
- status.total_dispatched_count = sub.total_dispatched_count
- status.total_dispatched_size = sub.total_dispatched_size
- status.total_ack_count = sub.total_ack_count
- status.total_nack_count = sub.total_nack_count
- status.acquired_size = sub.acquired_size
- status.acquired_count = sub.acquired_count
- status.waiting_on = if( sub.full ) {
- "ack"
- } else if( !sub.pos.is_loaded ) {
- "load"
- } else if( !sub.pos.is_tail ) {
- "producer"
- } else {
- "dispatch"
- }
- result.consumers.add(status)
+ def status(qo:Option[Queue], entries:Boolean=false, cb:Option[QueueStatusDTO]=>Unit):Unit = if(qo==None) {
+ cb(None)
+ } else {
+ val q = qo.get
+ q.dispatchQueue {
+ val rc = new QueueStatusDTO
+ rc.id = q.id
+ rc.binding = q.binding.binding_dto
+ rc.capacity_used = q.capacity_used
+ rc.capacity = q.capacity
+
+ rc.enqueue_item_counter = q.enqueue_item_counter
+ rc.dequeue_item_counter = q.dequeue_item_counter
+ rc.enqueue_size_counter = q.enqueue_size_counter
+ rc.dequeue_size_counter = q.dequeue_size_counter
+ rc.nack_item_counter = q.nack_item_counter
+ rc.nack_size_counter = q.nack_size_counter
+
+ rc.queue_size = q.queue_size
+ rc.queue_items = q.queue_items
+
+ rc.loading_size = q.loading_size
+ rc.flushing_size = q.flushing_size
+ rc.flushed_items = q.flushed_items
+
+ if( entries ) {
+ var cur = q.head_entry
+ while( cur!=null ) {
+
+ val e = new EntryStatusDTO
+ e.seq = cur.seq
+ e.count = cur.count
+ e.size = cur.size
+ e.consumer_count = cur.parked.size
+ e.is_prefetched = cur.is_prefetched
+ e.state = cur.label
+
+ rc.entries.add(e)
+
+ cur = if( cur == q.tail_entry ) {
+ null
+ } else {
+ cur.nextOrTail
}
- cb(Some(result))
}
}
+
+ q.inbound_sessions.flatMap( _.producer.connection ).foreach { connection=>
+ rc.producers.add(link(connection))
+ }
+ q.all_subscriptions.valuesIterator.toSeq.foreach{ sub =>
+ val status = new QueueConsumerStatusDTO
+ sub.consumer.connection.foreach(x=> status.link = link(x))
+ status.total_dispatched_count = sub.total_dispatched_count
+ status.total_dispatched_size = sub.total_dispatched_size
+ status.total_ack_count = sub.total_ack_count
+ status.total_nack_count = sub.total_nack_count
+ status.acquired_size = sub.acquired_size
+ status.acquired_count = sub.acquired_count
+ status.waiting_on = if( sub.full ) {
+ "ack"
+ } else if( sub.pos.is_tail ) {
+ "producer"
+ } else if( !sub.pos.is_loaded ) {
+ "load"
+ } else {
+ "dispatch"
+ }
+ rc.consumers.add(status)
+ }
+ cb(Some(rc))
}
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade Wed Dec 8 20:17:12 2010
@@ -22,13 +22,15 @@
h1 Destination: #{name}
-h2 Queues
+h3 Queue Domain
ul
- for( x <- queues )
li
- a(href={ path("queues/"+x.id) }) #{x.label}
+ a(href={ path("../../queues/"+x.id) }) #{x.label}
-h3 Broadcast Producers
+h3 Topic Domain
+
+h4 Publishers
ul
- for( x <- producers )
- x.kind match
@@ -37,10 +39,13 @@ ul
a(href={ path("../../../../connections/"+x.ref) }) #{x.label}
- case _ =>
-h3 Broadcast Consumers
+h4 Subscribers
ul
- for( x <- consumers )
- x.kind match
+ - case "queue" =>
+ li
+ a(href={ path("../../queues/"+x.ref) }) #{x.label}
- case "connection" =>
li
a(href={ path("../../../../connections/"+x.ref) }) #{x.label}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Wed Dec 8 20:17:12 2010
@@ -20,7 +20,21 @@
.breadcumbs
a(href={strip_resolve("..")}) Back
-h1 Queue: #{label}
+- binding match
+ - case x:QueueBindingDTO =>
+ h1 Queue #{x.destination}
+
+ - case x:SubscriptionBindingDTO =>
+ h1 Durable Subscription on #{x.destination}
+ p client id: ${x.client_id}
+ p subscription id: ${x.subscription_id}
+ p filter: ${x.filter}
+
+ - case x:TempBindingDTO =>
+ h1 Temporary Queue
+
+ - case x =>
+ h1 Unknown Queue Type: #{x.getClass.getName}
h2 Current Size
@@ -51,7 +65,7 @@ ul
- x.kind match
- case "connection" =>
li.producer
- a(href={ path("../../../../../../connections/"+x.ref) }) #{x.label}
+ a(href={ path("../../../../connections/"+x.ref) }) #{x.label}
- case _ =>
@@ -61,7 +75,7 @@ ul
- import consumer._
li.consumer
- if( link !=null )
- a(href={ path("../../../../../../connections/"+link.ref ) }) #{link.label}
+ a(href={ path("../../../../connections/"+link.ref ) }) #{link.label}
p next message seq: #{position}
p acquired: #{acquired_count} messages (#{memory(acquired_size)})