You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/08 21:17:12 UTC

svn commit: r1043655 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apoll...

Author: chirino
Date: Wed Dec  8 20:17:12 2010
New Revision: 1043655

URL: http://svn.apache.org/viewvc?rev=1043655&view=rev
Log:
- Adding initial support for using a temp queue to spool messages for slow topic consumers
- Updated admin interface so it view those temp queues too.

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java
      - copied, changed from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java
      - copied, changed from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java
      - copied, changed from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/binding-factory.index Wed Dec  8 20:17:12 2010
@@ -14,5 +14,6 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.PointToPointBindingFactory
-org.apache.activemq.apollo.broker.DurableSubBindingFactory
\ No newline at end of file
+org.apache.activemq.apollo.broker.QueueBindingFactory
+org.apache.activemq.apollo.broker.SubscriptionBindingFactory
+org.apache.activemq.apollo.broker.TempBindingFactory
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Wed Dec  8 20:17:12 2010
@@ -105,30 +105,30 @@ trait Binding {
   def destination:Path
 }
 
-object PointToPointBinding {
+object QueueBinding {
   val POINT_TO_POINT_KIND = new AsciiBuffer("ptp")
   val DESTINATION_PATH = new AsciiBuffer("default");
 }
 
-import PointToPointBinding._
+import QueueBinding._
 
-class PointToPointBindingFactory extends BindingFactory.Provider {
+class QueueBindingFactory extends BindingFactory.Provider {
 
   def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
     if( binding_kind == POINT_TO_POINT_KIND ) {
-      val dto = new PointToPointBindingDTO
+      val dto = new QueueBindingDTO
       dto.destination = binding_data.ascii.toString
-      new PointToPointBinding(binding_data, dto)
+      new QueueBinding(binding_data, dto)
     } else {
       null
     }
   }
 
   def create(binding_dto:BindingDTO) = {
-    if( binding_dto.isInstanceOf[PointToPointBindingDTO] ) {
-      val ptp_dto = binding_dto.asInstanceOf[PointToPointBindingDTO]
+    if( binding_dto.isInstanceOf[QueueBindingDTO] ) {
+      val ptp_dto = binding_dto.asInstanceOf[QueueBindingDTO]
       val data = new AsciiBuffer(ptp_dto.destination).buffer
-      new PointToPointBinding(data, ptp_dto)
+      new QueueBinding(data, ptp_dto)
     } else {
       null
     }
@@ -141,7 +141,7 @@ class PointToPointBindingFactory extends
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class PointToPointBinding(val binding_data:Buffer, val binding_dto:PointToPointBindingDTO) extends Binding {
+class QueueBinding(val binding_data:Buffer, val binding_dto:QueueBindingDTO) extends Binding {
 
   val destination = DestinationParser.decode_path(binding_dto.destination)
   def binding_kind = POINT_TO_POINT_KIND
@@ -163,30 +163,30 @@ class PointToPointBinding(val binding_da
   override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
 
   override def equals(o:Any):Boolean = o match {
-    case x: PointToPointBinding => x.binding_data == binding_data
+    case x: QueueBinding => x.binding_data == binding_data
     case _ => false
   }
 
 }
 
 
-object DurableSubBinding {
+object SubscriptionBinding {
   val DURABLE_SUB_KIND = new AsciiBuffer("ds")
 }
 
-import DurableSubBinding._
+import SubscriptionBinding._
 
-class DurableSubBindingFactory extends BindingFactory.Provider {
+class SubscriptionBindingFactory extends BindingFactory.Provider {
   def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
     if( binding_kind == DURABLE_SUB_KIND ) {
-      new DurableSubBinding(binding_data, JsonCodec.decode(binding_data, classOf[DurableSubscriptionBindingDTO]))
+      new SubscriptionBinding(binding_data, JsonCodec.decode(binding_data, classOf[SubscriptionBindingDTO]))
     } else {
       null
     }
   }
   def create(binding_dto:BindingDTO) = {
-    if( binding_dto.isInstanceOf[DurableSubscriptionBindingDTO] ) {
-      new DurableSubBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[DurableSubscriptionBindingDTO])
+    if( binding_dto.isInstanceOf[SubscriptionBindingDTO] ) {
+      new SubscriptionBinding(JsonCodec.encode(binding_dto), binding_dto.asInstanceOf[SubscriptionBindingDTO])
     } else {
       null
     }
@@ -200,7 +200,7 @@ class DurableSubBindingFactory extends B
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class DurableSubBinding(val binding_data:Buffer, val binding_dto:DurableSubscriptionBindingDTO) extends Binding {
+class SubscriptionBinding(val binding_data:Buffer, val binding_dto:SubscriptionBindingDTO) extends Binding {
 
   val destination = DestinationParser.decode_path(binding_dto.destination)
 
@@ -229,7 +229,7 @@ class DurableSubBinding(val binding_data
   override def hashCode = binding_kind.hashCode ^ binding_data.hashCode
 
   override def equals(o:Any):Boolean = o match {
-    case x: DurableSubBinding => x.binding_data == binding_data
+    case x: SubscriptionBinding => x.binding_data == binding_data
     case _ => false
   }
 
@@ -248,4 +248,67 @@ class DurableSubBinding(val binding_data
     rc = rc && (o(config.subscription_id).map{ x=> x == binding_dto.subscription_id }.getOrElse(true))
     rc
   }
-}
\ No newline at end of file
+}
+
+
+object TempBinding {
+  val TEMP_DATA = new AsciiBuffer("")
+  val TEMP_KIND = new AsciiBuffer("tmp")
+  val TEMP_DTO = new TempBindingDTO
+}
+
+import TempBinding._
+
+class TempBindingFactory extends BindingFactory.Provider {
+
+  def create(binding_kind:AsciiBuffer, binding_data:Buffer) = {
+    if( binding_kind == TEMP_KIND ) {
+      new TempBinding("", "")
+    } else {
+      null
+    }
+  }
+
+  def create(binding_dto:BindingDTO) = {
+    if( binding_dto.isInstanceOf[TempBindingDTO] ) {
+      new TempBinding("", "")
+    } else {
+      null
+    }
+  }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class TempBinding(val key:AnyRef, val label:String) extends Binding {
+  def this(c:DeliveryConsumer) = this(c, c.connection.map(_.transport.getRemoteAddress).getOrElse("known") )
+
+  val destination = null
+  def binding_kind = TEMP_KIND
+  def binding_dto = TEMP_DTO
+  def binding_data = TEMP_DATA
+
+  def unbind(node: RoutingNode, queue: Queue) = {
+    if( node.unified ) {
+      node.remove_broadcast_consumer(queue)
+    }
+  }
+
+  def bind(node: RoutingNode, queue: Queue) = {
+    if( node.unified ) {
+      node.add_broadcast_consumer(queue)
+    }
+  }
+
+  override def hashCode = if(key==null) 0 else key.hashCode
+
+  override def equals(o:Any):Boolean = o match {
+    case x: TempBinding => x.key == key
+    case _ => false
+  }
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Dec  8 20:17:12 2010
@@ -28,9 +28,9 @@ import collection.mutable.{ListBuffer, H
 import scala.collection.immutable.List
 import org.apache.activemq.apollo.store.{StoreUOW, QueueRecord}
 import Buffer._
-import org.apache.activemq.apollo.dto.{QueueDTO, PointToPointBindingDTO, BindingDTO}
 import org.apache.activemq.apollo.util.path.{Path, Part, PathMap, PathParser}
 import java.util.ArrayList
+import org.apache.activemq.apollo.dto._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -68,7 +68,8 @@ class Router(val host:VirtualHost) exten
 
   protected def dispatchQueue:DispatchQueue = host.dispatchQueue
 
-  var queues = HashMap[Binding, Queue]()
+  var queue_bindings = HashMap[Binding, Queue]()
+  var queues = HashMap[Long, Queue]()
 
   // Only stores simple paths, used for wild card lookups.
   var destinations = new PathMap[RoutingNode]()
@@ -134,9 +135,26 @@ class Router(val host:VirtualHost) exten
       }
     }.getOrElse(new QueueDTO)
 
-    val queue = new Queue(host, id, binding, config)
+
+    var qid = id
+    if( qid == -1 ) {
+      qid = host.queue_id_counter.incrementAndGet
+    }
+
+    val queue = new Queue(host, qid, binding, config)
+    if( queue.tune_persistent && id == -1 ) {
+
+      val record = new QueueRecord
+      record.key = qid
+      record.binding_data = binding.binding_data
+      record.binding_kind = binding.binding_kind
+
+      host.store.addQueue(record) { rc =>  }
+
+    }
     queue.start
-    queues.put(binding, queue)
+    queue_bindings.put(binding, queue)
+    queues.put(queue.id, queue)
 
     // Not all queues are bound to destinations.
     val name = binding.destination
@@ -166,31 +184,26 @@ class Router(val host:VirtualHost) exten
    */
   def _create_queue(dto: BindingDTO): Option[Queue] = {
     val binding = BindingFactory.create(dto)
-    val queue = queues.get(binding) match {
+    val queue = queue_bindings.get(binding) match {
       case Some(queue) => Some(queue)
       case None => Some(_create_queue(-1, binding))
     }
     queue
   }
 
-  def create_queue(dto:BindingDTO) = dispatchQueue ! {
-    _create_queue(dto)
+  def create_queue(id:BindingDTO) = dispatchQueue ! {
+    _create_queue(id)
   }
 
   /**
    * Returns true if the queue no longer exists.
    */
-  def destroy_queue(dto:BindingDTO) = dispatchQueue ! {
-    val binding = BindingFactory.create(dto)
-    queues.get(binding) match {
+  def destroy_queue(dto:BindingDTO) = dispatchQueue ! { _destroy_queue(dto) }
+
+  def _destroy_queue(dto:BindingDTO):Boolean = {
+    queue_bindings.get(BindingFactory.create(dto)) match {
       case Some(queue) =>
-        val name = binding.destination
-        if( name!=null ) {
-          get_destination_matches(name).foreach( node=>
-            node.remove_queue(queue)
-          )
-        }
-        queue.stop
+        _destroy_queue(queue)
         true
       case None =>
         true
@@ -198,11 +211,50 @@ class Router(val host:VirtualHost) exten
   }
 
   /**
+   * Returns true if the queue no longer exists.
+   */
+  def destroy_queue(id:Long) = dispatchQueue ! { _destroy_queue(id) }
+
+  def _destroy_queue(id:Long):Boolean = {
+    queues.get(id) match {
+      case Some(queue) =>
+        _destroy_queue(queue)
+        true
+      case None =>
+        true
+    }
+  }
+
+  def _destroy_queue(queue:Queue):Unit = {
+    queue_bindings.remove(queue.binding)
+    queues.remove(queue.id)
+
+    val name = queue.binding.destination
+    if( name!=null ) {
+      get_destination_matches(name).foreach( node=>
+        node.remove_queue(queue)
+      )
+    }
+    queue.stop
+    if( queue.tune_persistent ) {
+      queue.dispatchQueue ^ {
+        host.store.removeQueue(queue.id){x=>}
+      }
+    }
+  }
+
+  /**
    * Gets an existing queue.
    */
   def get_queue(dto:BindingDTO) = dispatchQueue ! {
-    val binding = BindingFactory.create(dto)
-    queues.get(binding)
+    queue_bindings.get(BindingFactory.create(dto))
+  }
+
+  /**
+   * Gets an existing queue.
+   */
+  def get_queue(id:Long) = dispatchQueue ! {
+    queues.get(id)
   }
 
   def bind(destination:Destination, consumer:DeliveryConsumer) = {
@@ -218,9 +270,9 @@ class Router(val host:VirtualHost) exten
         val node = create_destination_or(name) { node=> Unit }
       }
 
-      get_destination_matches(name).foreach( node=>
+      get_destination_matches(name).foreach{ node=>
         node.add_broadcast_consumer(consumer)
-      )
+      }
       broadcast_consumers.put(name, consumer)
     }
   }
@@ -249,7 +301,7 @@ class Router(val host:VirtualHost) exten
 
       // Looking up the queue will cause it to get created if it does not exist.
       val queue = if( !topic ) {
-        val dto = new PointToPointBindingDTO
+        val dto = new QueueBindingDTO
         dto.destination = DestinationParser.encode_path(destination.name)
         _create_queue(dto)
       } else {
@@ -280,11 +332,14 @@ class Router(val host:VirtualHost) exten
 
 }
 
-
+object RoutingNode {
+  val DEFAULT_CONFIG = new DestinationDTO
+}
 /**
  * Tracks state associated with a destination name.
  */
 class RoutingNode(val router:Router, val name:Path) {
+  import RoutingNode._
 
   val id = router.destination_id_counter.incrementAndGet
 
@@ -292,31 +347,64 @@ class RoutingNode(val router:Router, val
   var broadcast_consumers = ListBuffer[DeliveryConsumer]()
   var queues = ListBuffer[Queue]()
 
-  val unified = {
+  import OptionSupport._
+
+  val config = {
     import collection.JavaConversions._
-    import OptionSupport._
     import DestinationParser.default._
-
-    val t= router.host.config.destinations.find( x=> parseFilter(ascii(x.path)).matches(name) )
-    t.flatMap(x=> o(x.unified)).getOrElse(false)
+    router.host.config.destinations.find( x=> parseFilter(ascii(x.path)).matches(name) ).getOrElse(DEFAULT_CONFIG)
   }
 
+  def unified = config.unified.getOrElse(false)
+  def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
+
+  var consumer_proxies = Map[DeliveryConsumer, DeliveryConsumer]()
+
   def add_broadcast_consumer (consumer:DeliveryConsumer) = {
-    broadcast_consumers += consumer
 
-    val list = consumer :: Nil
+    var target = consumer
+    slow_consumer_policy match {
+      case "queue" =>
+
+        // create a temp queue so that it can spool
+        val queue = router._create_queue(-1, new TempBinding(consumer))
+        queue.dispatchQueue.setTargetQueue(consumer.dispatchQueue)
+        queue.bind(List(consumer))
+
+        consumer_proxies += consumer->queue
+        target = queue
+
+      case "block" =>
+        // just have dispatcher dispatch directly to them..
+    }
+
+    broadcast_consumers += target
+    val list = target :: Nil
     broadcast_producers.foreach({ r=>
       r.bind(list)
     })
   }
 
   def remove_broadcast_consumer (consumer:DeliveryConsumer) = {
-    broadcast_consumers = broadcast_consumers.filterNot( _ == consumer )
 
-    val list = consumer :: Nil
+    var target = consumer_proxies.get(consumer).getOrElse(consumer)
+
+    broadcast_consumers = broadcast_consumers.filterNot( _ == target )
+
+    val list = target :: Nil
     broadcast_producers.foreach({ r=>
       r.unbind(list)
     })
+
+    target match {
+      case queue:Queue=>
+        val binding = new TempBinding(consumer)
+        if( queue.binding == binding ) {
+          queue.unbind(List(consumer))
+          router._destroy_queue(queue.id)
+        }
+      case _ =>
+    }
   }
 
   def add_broadcast_producer (producer:DeliveryProducerRoute) = {

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java Wed Dec  8 20:17:12 2010
@@ -27,7 +27,7 @@ import javax.xml.bind.annotation.*;
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 @XmlType(name = "binding")
-@XmlSeeAlso({PointToPointBindingDTO.class, DurableSubscriptionBindingDTO.class})
+@XmlSeeAlso({QueueBindingDTO.class, SubscriptionBindingDTO.class})
 @JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
 @XmlAccessorType(XmlAccessType.FIELD)
 public class BindingDTO {

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationDTO.java Wed Dec  8 20:17:12 2010
@@ -43,4 +43,6 @@ public class DestinationDTO extends Stri
     @XmlAttribute
     public Boolean unified;
 
+    public String slow_consumer_policy;
+
 }

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java (from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java&r1=1043576&r2=1043655&rev=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueBindingDTO.java Wed Dec  8 20:17:12 2010
@@ -26,9 +26,14 @@ import javax.xml.bind.annotation.*;
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlType(name = "binding")
-@XmlSeeAlso({PointToPointBindingDTO.class, DurableSubscriptionBindingDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+@XmlRootElement(name = "queue-binding")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class BindingDTO {
+public class QueueBindingDTO extends BindingDTO {
+
+    /**
+     * A label that describes the binding
+     */
+    @XmlAttribute
+    public String destination;
+
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Wed Dec  8 20:17:12 2010
@@ -32,8 +32,8 @@ import java.util.List;
 @XmlAccessorType(XmlAccessType.FIELD)
 public class QueueStatusDTO extends LongIdDTO {
 
-    @XmlAttribute
-    public String label;
+    @XmlElement
+    public BindingDTO binding;
 
     @XmlAttribute(name="enqueue-item-counter")
     public long enqueue_item_counter;

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java (from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java&r1=1043576&r2=1043655&rev=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/SubscriptionBindingDTO.java Wed Dec  8 20:17:12 2010
@@ -16,9 +16,10 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
-import javax.xml.bind.annotation.*;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
 
 /**
  * <p>
@@ -26,9 +27,17 @@ import javax.xml.bind.annotation.*;
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlType(name = "binding")
-@XmlSeeAlso({PointToPointBindingDTO.class, DurableSubscriptionBindingDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+@XmlRootElement(name = "subscription-binding")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class BindingDTO {
+public class SubscriptionBindingDTO extends BindingDTO {
+
+    public String destination;
+
+    public String filter;
+
+    @XmlAttribute(name="client-id")
+    public String client_id;
+
+    @XmlAttribute(name="subscription-id")
+    public String subscription_id;
 }
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java (from r1043576, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java&r1=1043576&r2=1043655&rev=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BindingDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TempBindingDTO.java Wed Dec  8 20:17:12 2010
@@ -16,9 +16,10 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
-import javax.xml.bind.annotation.*;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
 
 /**
  * <p>
@@ -26,9 +27,7 @@ import javax.xml.bind.annotation.*;
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlType(name = "binding")
-@XmlSeeAlso({PointToPointBindingDTO.class, DurableSubscriptionBindingDTO.class})
-@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+@XmlRootElement(name = "temp-binding")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class BindingDTO {
+public class TempBindingDTO extends BindingDTO {
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Dec  8 20:17:12 2010
@@ -36,7 +36,7 @@ import org.apache.activemq.apollo.store.
 import org.apache.activemq.apollo.util._
 import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
-import org.apache.activemq.apollo.dto.{StompConnectionStatusDTO, BindingDTO, DurableSubscriptionBindingDTO, PointToPointBindingDTO}
+import org.apache.activemq.apollo.dto.{StompConnectionStatusDTO, BindingDTO, SubscriptionBindingDTO, QueueBindingDTO}
 import scala.util.continuations._
 
 /**
@@ -460,6 +460,10 @@ class StompProtocolHandler extends Proto
     connection.transport.resumeRead
   }
 
+  def weird(headers:HeaderMap) = {
+    println("weird: "+headers)
+  }
+
   def on_stomp_connect(headers:HeaderMap):Unit = {
 
     security_context.user = get(headers, LOGIN).toString
@@ -542,7 +546,9 @@ class StompProtocolHandler extends Proto
         } else {
           val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
           session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
-
+          if( connection_sink==null ) {
+            weird(headers)
+          }
           connection_sink.offer(
             StompFrame(CONNECTED, List(
               (VERSION, protocol_version),
@@ -742,7 +748,7 @@ class StompProtocolHandler extends Proto
       // recover the queue on restart and rebind it the
       // way again)
       if (topic) {
-        val rc = new DurableSubscriptionBindingDTO
+        val rc = new SubscriptionBindingDTO
         rc.destination = DestinationParser.encode_path(destination.name)
         // TODO:
         // rc.client_id =
@@ -750,7 +756,7 @@ class StompProtocolHandler extends Proto
         rc.filter = if (selector == null) null else selector._1
         rc
       } else {
-        val rc = new PointToPointBindingDTO
+        val rc = new QueueBindingDTO
         rc.destination = DestinationParser.encode_path(destination.name)
         rc
       }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Wed Dec  8 20:17:12 2010
@@ -26,6 +26,7 @@ import collection.JavaConversions
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker._
 import collection.mutable.ListBuffer
+import scala.util.continuations._
 
 /**
  * <p>
@@ -144,6 +145,14 @@ case class RuntimeResource(parent:Broker
     link
   }
 
+  def link(queue:Queue) = {
+    val link = new LinkDTO()
+    link.kind = "queue"
+    link.ref = queue.id.toString
+    link.label = queue.binding.label
+    link
+  }
+
   @GET @Path("virtual-hosts/{id}/destinations/{dest}")
   def destination(@PathParam("id") id : Long, @PathParam("dest") dest : Long):DestinationStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
@@ -154,8 +163,15 @@ case class RuntimeResource(parent:Broker
         node.queues.foreach { q=>
           result.queues.add(new LongIdLabeledDTO(q.id, q.binding.label))
         }
-        node.broadcast_consumers.flatMap( _.connection ).foreach { connection=>
-          result.consumers.add(link(connection))
+        node.broadcast_consumers.foreach { consumer=>
+          consumer match {
+            case queue:Queue =>
+              result.consumers.add(link(queue))
+            case _ =>
+              consumer.connection.foreach{c=>
+                result.consumers.add(link(c))
+              }
+          }
         }
         node.broadcast_producers.flatMap( _.producer.connection ).foreach { connection=>
           result.producers.add(link(connection))
@@ -166,81 +182,96 @@ case class RuntimeResource(parent:Broker
     }
   }
 
+  @GET @Path("virtual-hosts/{id}/queues/{queue}")
+  def queue(@PathParam("id") id : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean):QueueStatusDTO = {
+    with_virtual_host(id) { case (virtualHost,cb) =>
+      reset {
+        val queue = virtualHost.router.get_queue(qid)
+        status(queue, entries, cb)
+      }
+    }
+  }
+
   @GET @Path("virtual-hosts/{id}/destinations/{dest}/queues/{queue}")
-  def queue(@PathParam("id") id : Long, @PathParam("dest") dest : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
+  def destination_queue(@PathParam("id") id : Long, @PathParam("dest") dest : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
       import JavaConversions._
-      (virtualHost.router.routing_nodes.find { _.id == dest } flatMap { node=> node.queues.find  { _.id == qid } }) match {
-        case None=> cb(None)
-        case Some(q) => q.dispatchQueue {
-          val result = new QueueStatusDTO
-          result.id = q.id
-          result.label = q.binding.label
-          result.capacity_used = q.capacity_used
-          result.capacity = q.capacity
-
-          result.enqueue_item_counter = q.enqueue_item_counter
-          result.dequeue_item_counter = q.dequeue_item_counter
-          result.enqueue_size_counter = q.enqueue_size_counter
-          result.dequeue_size_counter = q.dequeue_size_counter
-          result.nack_item_counter = q.nack_item_counter
-          result.nack_size_counter = q.nack_size_counter
-
-          result.queue_size = q.queue_size
-          result.queue_items = q.queue_items
-
-          result.loading_size = q.loading_size
-          result.flushing_size = q.flushing_size
-          result.flushed_items = q.flushed_items
-
-          if( entries ) {
-            var cur = q.head_entry
-            while( cur!=null ) {
-
-              val e = new EntryStatusDTO
-              e.seq = cur.seq
-              e.count = cur.count
-              e.size = cur.size
-              e.consumer_count = cur.parked.size
-              e.is_prefetched = cur.is_prefetched
-              e.state = cur.label
-
-              result.entries.add(e)
-
-              cur = if( cur == q.tail_entry ) {
-                null
-              } else {
-                cur.nextOrTail
-              }
-            }
-          }
+      val queue = virtualHost.router.routing_nodes.find { _.id == dest } flatMap { node=> node.queues.find  { _.id == qid } }
+      status(queue, entries, cb)
+    }
+  }
 
-          q.inbound_sessions.flatMap( _.producer.connection ).foreach { connection=>
-            result.producers.add(link(connection))
-          }
-          q.all_subscriptions.valuesIterator.toSeq.foreach{ sub =>
-            val status = new QueueConsumerStatusDTO
-            sub.consumer.connection.foreach(x=> status.link = link(x))
-            status.total_dispatched_count = sub.total_dispatched_count
-            status.total_dispatched_size = sub.total_dispatched_size
-            status.total_ack_count = sub.total_ack_count
-            status.total_nack_count = sub.total_nack_count
-            status.acquired_size = sub.acquired_size
-            status.acquired_count = sub.acquired_count
-            status.waiting_on = if( sub.full ) {
-              "ack"
-            } else if( !sub.pos.is_loaded ) {
-              "load"
-            } else if( !sub.pos.is_tail ) {
-              "producer"
-            } else {
-              "dispatch"
-            }
-            result.consumers.add(status)
+  def status(qo:Option[Queue], entries:Boolean=false, cb:Option[QueueStatusDTO]=>Unit):Unit = if(qo==None) {
+    cb(None)
+  } else {
+    val q = qo.get
+    q.dispatchQueue {
+      val rc = new QueueStatusDTO
+      rc.id = q.id
+      rc.binding = q.binding.binding_dto
+      rc.capacity_used = q.capacity_used
+      rc.capacity = q.capacity
+
+      rc.enqueue_item_counter = q.enqueue_item_counter
+      rc.dequeue_item_counter = q.dequeue_item_counter
+      rc.enqueue_size_counter = q.enqueue_size_counter
+      rc.dequeue_size_counter = q.dequeue_size_counter
+      rc.nack_item_counter = q.nack_item_counter
+      rc.nack_size_counter = q.nack_size_counter
+
+      rc.queue_size = q.queue_size
+      rc.queue_items = q.queue_items
+
+      rc.loading_size = q.loading_size
+      rc.flushing_size = q.flushing_size
+      rc.flushed_items = q.flushed_items
+
+      if( entries ) {
+        var cur = q.head_entry
+        while( cur!=null ) {
+
+          val e = new EntryStatusDTO
+          e.seq = cur.seq
+          e.count = cur.count
+          e.size = cur.size
+          e.consumer_count = cur.parked.size
+          e.is_prefetched = cur.is_prefetched
+          e.state = cur.label
+
+          rc.entries.add(e)
+
+          cur = if( cur == q.tail_entry ) {
+            null
+          } else {
+            cur.nextOrTail
           }
-          cb(Some(result))
         }
       }
+
+      q.inbound_sessions.flatMap( _.producer.connection ).foreach { connection=>
+        rc.producers.add(link(connection))
+      }
+      q.all_subscriptions.valuesIterator.toSeq.foreach{ sub =>
+        val status = new QueueConsumerStatusDTO
+        sub.consumer.connection.foreach(x=> status.link = link(x))
+        status.total_dispatched_count = sub.total_dispatched_count
+        status.total_dispatched_size = sub.total_dispatched_size
+        status.total_ack_count = sub.total_ack_count
+        status.total_nack_count = sub.total_nack_count
+        status.acquired_size = sub.acquired_size
+        status.acquired_count = sub.acquired_count
+        status.waiting_on = if( sub.full ) {
+          "ack"
+        } else if( sub.pos.is_tail ) {
+          "producer"
+        } else if( !sub.pos.is_loaded ) {
+          "load"
+        } else {
+          "dispatch"
+        }
+        rc.consumers.add(status)
+      }
+      cb(Some(rc))
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade Wed Dec  8 20:17:12 2010
@@ -22,13 +22,15 @@
 
 h1 Destination: #{name}
 
-h2 Queues
+h3 Queue Domain
 ul
   - for( x <- queues )
     li
-      a(href={ path("queues/"+x.id) }) #{x.label}
+      a(href={ path("../../queues/"+x.id) }) #{x.label}
 
-h3 Broadcast Producers
+h3 Topic Domain
+
+h4 Publishers
 ul
   - for( x <- producers )
     - x.kind match
@@ -37,10 +39,13 @@ ul
           a(href={ path("../../../../connections/"+x.ref) }) #{x.label}
       - case _ =>
 
-h3 Broadcast Consumers
+h4 Subscribers
 ul
   - for( x <- consumers )
     - x.kind match
+      - case "queue" =>
+        li
+          a(href={ path("../../queues/"+x.ref) }) #{x.label}
       - case "connection" =>
         li
           a(href={ path("../../../../connections/"+x.ref) }) #{x.label}

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1043655&r1=1043654&r2=1043655&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Wed Dec  8 20:17:12 2010
@@ -20,7 +20,21 @@
 .breadcumbs
   a(href={strip_resolve("..")}) Back
 
-h1 Queue: #{label}
+- binding match
+  - case x:QueueBindingDTO =>
+    h1 Queue #{x.destination}
+
+  - case x:SubscriptionBindingDTO =>
+    h1 Durable Subscription on #{x.destination}
+    p client id: ${x.client_id}
+    p subscription id: ${x.subscription_id}
+    p filter: ${x.filter}
+
+  - case x:TempBindingDTO =>
+    h1 Temporary Queue
+
+  - case x =>
+    h1 Unknown Queue Type: #{x.getClass.getName}
 
 h2 Current Size
 
@@ -51,7 +65,7 @@ ul
     - x.kind match
       - case "connection" =>
         li.producer
-          a(href={ path("../../../../../../connections/"+x.ref) }) #{x.label}
+          a(href={ path("../../../../connections/"+x.ref) }) #{x.label}
       - case _ =>
 
 
@@ -61,7 +75,7 @@ ul
     - import consumer._
     li.consumer
       - if( link !=null )
-        a(href={ path("../../../../../../connections/"+link.ref ) }) #{link.label}
+        a(href={ path("../../../../connections/"+link.ref ) }) #{link.label}
 
       p next message seq: #{position}
       p acquired: #{acquired_count} messages (#{memory(acquired_size)})