You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/08 03:49:29 UTC

svn commit: r1043284 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/ apollo-web...

Author: chirino
Date: Wed Dec  8 02:49:28 2010
New Revision: 1043284

URL: http://svn.apache.org/viewvc?rev=1043284&view=rev
Log:
web interface is now displaying more queue subscriber details.

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java
      - copied, changed from r1043042, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Dec  8 02:49:28 2010
@@ -508,8 +508,7 @@ class Queue(val host: VirtualHost, var i
 
   def bind(values: List[DeliveryConsumer]) = retaining(values) {
     for (consumer <- values) {
-      val subscription = new Subscription(this)
-      subscription.open(consumer)
+      val subscription = new Subscription(this, consumer)
       all_subscriptions += consumer -> subscription
       addCapacity( tune_consumer_buffer )
     }
@@ -1262,7 +1261,7 @@ class QueueEntry(val queue:Queue, val se
  * tracks the entries which the consumer has acquired.
  *
  */
-class Subscription(queue:Queue) extends DeliveryProducer with DispatchLogging {
+class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer with DispatchLogging {
   override protected def log = Queue
 
   def dispatchQueue = queue.dispatchQueue
@@ -1273,6 +1272,7 @@ class Subscription(queue:Queue) extends 
   var pos:QueueEntry = null
 
   var acquired_size = 0L
+  def acquired_count = acquired.size()
 
   var total_advanced_size = 0L
   var advanced_size = 0
@@ -1281,6 +1281,12 @@ class Subscription(queue:Queue) extends 
   var best_advanced_size = queue.tune_consumer_buffer * 100
   var tail_parkings = 1
 
+  var total_dispatched_count = 0L
+  var total_dispatched_size = 0L
+
+  var total_ack_count = 0L
+  var total_nack_count = 0L
+
   override def toString = {
     def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
     "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+"}"
@@ -1288,17 +1294,16 @@ class Subscription(queue:Queue) extends 
 
   def browser = session.consumer.browser
 
-  def open(consumer: DeliveryConsumer) = {
-    pos = queue.head_entry;
-    session = consumer.connect(this)
-    session.refiller = pos
-    queue.head_entry ::= this
-
-    if( queue.serviceState.isStarted ) {
-      // kick off the initial dispatch.
-      refill_prefetch
-      queue.dispatchQueue << queue.head_entry
-    }
+  // This opens up the consumer
+  pos = queue.head_entry;
+  session = consumer.connect(this)
+  session.refiller = pos
+  queue.head_entry ::= this
+
+  if( queue.serviceState.isStarted ) {
+    // kick off the initial dispatch.
+    refill_prefetch
+    queue.dispatchQueue << queue.head_entry
   }
 
   def close() = {
@@ -1358,7 +1363,15 @@ class Subscription(queue:Queue) extends 
 
   def matches(entry:Delivery) = session.consumer.matches(entry)
   def full = session.full
-  def offer(delivery:Delivery) = session.offer(delivery)
+  def offer(delivery:Delivery) = {
+    if( session.offer(delivery) ) {
+      total_dispatched_count += 1
+      total_dispatched_size += delivery.size
+      true
+    } else {
+      false
+    }
+  }
 
   def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
 
@@ -1399,6 +1412,7 @@ class Subscription(queue:Queue) extends 
       if( session == null ) {
         return;
       }
+      total_ack_count += 1
       if (entry.messageKey != -1) {
         val storeBatch = if( sb == null ) {
           queue.host.store.createStoreUOW
@@ -1436,6 +1450,7 @@ class Subscription(queue:Queue) extends 
         return;
       }
 
+      total_nack_count += 1
       entry.as_loaded.acquired = false
       acquired_size -= entry.size
 

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java Wed Dec  8 02:49:28 2010
@@ -43,14 +43,14 @@ public class DestinationStatusDTO extend
      */
     @XmlElement(name="producer")
     @JsonProperty("producers")
-    public List<LongIdLabeledDTO> producers = new ArrayList<LongIdLabeledDTO>();
+    public List<LinkDTO> producers = new ArrayList<LinkDTO>();
 
     /**
      * Ids of all connections that are consuming from the destination
      */
     @XmlElement(name="consumer")
     @JsonProperty("consumers")
-    public List<LongIdLabeledDTO> consumers = new ArrayList<LongIdLabeledDTO>();
+    public List<LinkDTO> consumers = new ArrayList<LinkDTO>();
 
     /**
      * Ids of all queues that are associated with the destination

Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java?rev=1043284&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java Wed Dec  8 02:49:28 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.dto;
+
+import org.codehaus.jackson.annotate.JsonTypeInfo;
+
+import javax.xml.bind.annotation.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="link")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class LinkDTO {
+
+    @XmlAttribute
+    public String kind;
+
+    @XmlAttribute
+    public String ref;
+
+    @XmlAttribute
+    public String label;
+
+
+}
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java (from r1043042, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java&r1=1043042&r2=1043284&rev=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java Wed Dec  8 02:49:28 2010
@@ -16,46 +16,34 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonProperty;
-
 import javax.xml.bind.annotation.*;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
- * <p>
- * </p>
- *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="destination-status")
+@XmlRootElement(name="queue-consumer-status")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class DestinationStatusDTO extends LongIdDTO {
+public class QueueConsumerStatusDTO {
 
     /**
-     * The destination name
+     * link to who is consuming from the queue.
      */
-    @XmlAttribute
-    public String name;
+    public LinkDTO link;
 
-    /**
-     * Ids of all connections that are producing to the destination
-     */
-    @XmlElement(name="producer")
-    @JsonProperty("producers")
-    public List<LongIdLabeledDTO> producers = new ArrayList<LongIdLabeledDTO>();
+    public long position = 0;
 
-    /**
-     * Ids of all connections that are consuming from the destination
-     */
-    @XmlElement(name="consumer")
-    @JsonProperty("consumers")
-    public List<LongIdLabeledDTO> consumers = new ArrayList<LongIdLabeledDTO>();
+    public int acquired_count;
+    public long acquired_size;
+
+    public long total_dispatched_count;
+    public long total_dispatched_size;
+
+    public long total_ack_count;
+    public long total_nack_count;
 
     /**
-     * Ids of all queues that are associated with the destination
+     * What the consumer is currently waiting on
      */
-    @XmlElement(name="queue")
-    @JsonProperty("queues")
-    public List<LongIdLabeledDTO> queues = new ArrayList<LongIdLabeledDTO>();
+    @XmlAttribute(name="waiting-on")
+	public String waiting_on;
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Wed Dec  8 02:49:28 2010
@@ -32,12 +32,6 @@ import java.util.List;
 @XmlAccessorType(XmlAccessType.FIELD)
 public class QueueStatusDTO extends LongIdDTO {
 
-    /**
-     * A unique id of the object within it's container
-     */
-	@XmlAttribute
-	public long id;
-
     @XmlAttribute
     public String label;
 
@@ -91,12 +85,12 @@ public class QueueStatusDTO extends Long
      * Ids of all connections that are producing to the destination
      */
     @XmlElement(name="producer")
-    public List<LongIdLabeledDTO> producers = new ArrayList<LongIdLabeledDTO>();
+    public List<LinkDTO> producers = new ArrayList<LinkDTO>();
 
     /**
      * Ids of all connections that are consuming from the destination
      */
     @XmlElement(name="consumer")
-    public List<LongIdLabeledDTO> consumers = new ArrayList<LongIdLabeledDTO>();
+    public List<QueueConsumerStatusDTO> consumers = new ArrayList<QueueConsumerStatusDTO>();
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index Wed Dec  8 02:49:28 2010
@@ -42,4 +42,6 @@ KeyStorageDTO
 SimpleStoreStatusDTO
 NullStoreDTO
 QueueDTO
-DestinationDTO
\ No newline at end of file
+DestinationDTO
+LinkDTO
+QueueConsumerStatusDTO

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Wed Dec  8 02:49:28 2010
@@ -136,6 +136,14 @@ case class RuntimeResource(parent:Broker
     rc
   }
 
+  def link(connection:BrokerConnection) = {
+    val link = new LinkDTO()
+    link.kind = "connection"
+    link.ref = connection.id.toString
+    link.label = connection.transport.getRemoteAddress
+    link
+  }
+
   @GET @Path("virtual-hosts/{id}/destinations/{dest}")
   def destination(@PathParam("id") id : Long, @PathParam("dest") dest : Long):DestinationStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
@@ -147,10 +155,10 @@ case class RuntimeResource(parent:Broker
           result.queues.add(new LongIdLabeledDTO(q.id, q.binding.label))
         }
         node.broadcast_consumers.flatMap( _.connection ).foreach { connection=>
-          result.consumers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+          result.consumers.add(link(connection))
         }
         node.broadcast_producers.flatMap( _.producer.connection ).foreach { connection=>
-          result.producers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+          result.producers.add(link(connection))
         }
 
         result
@@ -162,9 +170,9 @@ case class RuntimeResource(parent:Broker
   def queue(@PathParam("id") id : Long, @PathParam("dest") dest : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
     with_virtual_host(id) { case (virtualHost,cb) =>
       import JavaConversions._
-      val rc = virtualHost.router.routing_nodes.find { _.id == dest } flatMap { node=>
-        node.queues.find  { _.id == qid } map { q=>
-
+      (virtualHost.router.routing_nodes.find { _.id == dest } flatMap { node=> node.queues.find  { _.id == qid } }) match {
+        case None=> cb(None)
+        case Some(q) => q.dispatchQueue {
           val result = new QueueStatusDTO
           result.id = q.id
           result.label = q.binding.label
@@ -208,16 +216,31 @@ case class RuntimeResource(parent:Broker
           }
 
           q.inbound_sessions.flatMap( _.producer.connection ).foreach { connection=>
-            result.producers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+            result.producers.add(link(connection))
           }
-          q.all_subscriptions.keysIterator.toSeq.flatMap( _.connection ).foreach { connection=>
-            result.consumers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+          q.all_subscriptions.valuesIterator.toSeq.foreach{ sub =>
+            val status = new QueueConsumerStatusDTO
+            sub.consumer.connection.foreach(x=> status.link = link(x))
+            status.total_dispatched_count = sub.total_dispatched_count
+            status.total_dispatched_size = sub.total_dispatched_size
+            status.total_ack_count = sub.total_ack_count
+            status.total_nack_count = sub.total_nack_count
+            status.acquired_size = sub.acquired_size
+            status.acquired_count = sub.acquired_count
+            status.waiting_on = if( sub.full ) {
+              "ack"
+            } else if( !sub.pos.is_loaded ) {
+              "load"
+            } else if( !sub.pos.is_tail ) {
+              "producer"
+            } else {
+              "dispatch"
+            }
+            result.consumers.add(status)
           }
-
-          result
+          cb(Some(result))
         }
       }
-      cb(rc)
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade Wed Dec  8 02:49:28 2010
@@ -31,13 +31,19 @@ ul
 h3 Broadcast Producers
 ul
   - for( x <- producers )
-    li
-      a(href={ path("../../../../connections/"+x.id) }) #{x.label}
+    - x.kind match
+      - case "connection" =>
+        li
+          a(href={ path("../../../../connections/"+x.ref) }) #{x.label}
+      - case _ =>
 
 h3 Broadcast Consumers
 ul
   - for( x <- consumers )
-    li
-      a(href={ path("../../../../connections/"+x.id) }) #{x.label}
+    - x.kind match
+      - case "connection" =>
+        li
+          a(href={ path("../../../../connections/"+x.ref) }) #{x.label}
+      - case _ =>
 
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Wed Dec  8 02:49:28 2010
@@ -48,15 +48,27 @@ p holding : #{flushed_items} flushed mes
 h3 Producers
 ul
   - for( x <- producers )
-    li
-      a(href={ path("../../../../../../connections/"+x.id) }) #{x.label}
+    - x.kind match
+      - case "connection" =>
+        li.producer
+          a(href={ path("../../../../../../connections/"+x.ref) }) #{x.label}
+      - case _ =>
+
 
 h3 Consumers
 ul
-  - for( x <- consumers )
-    li
-      a(href={ path("../../../../../../connections/"+x.id) }) #{x.label}
+  - for( consumer <- consumers )
+    - import consumer._
+    li.consumer
+      - if( link !=null )
+        a(href={ path("../../../../../../connections/"+link.ref ) }) #{link.label}
 
+      p next message seq: #{position}
+      p acquired: #{acquired_count} messages (#{memory(acquired_size)})
+      p dispatched: #{total_dispatched_count} messages (#{memory(total_dispatched_size)})
+      p acks: #{total_ack_count} messages
+      p naks: #{total_nack_count} messages
+      p waiting on: #{waiting_on}
 
 - if ( entries.isEmpty )
   h2