You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/08 03:49:29 UTC
svn commit: r1043284 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/ apollo-web...
Author: chirino
Date: Wed Dec 8 02:49:28 2010
New Revision: 1043284
URL: http://svn.apache.org/viewvc?rev=1043284&view=rev
Log:
web interface is now displaying more queue subscriber details.
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java
- copied, changed from r1043042, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Dec 8 02:49:28 2010
@@ -508,8 +508,7 @@ class Queue(val host: VirtualHost, var i
def bind(values: List[DeliveryConsumer]) = retaining(values) {
for (consumer <- values) {
- val subscription = new Subscription(this)
- subscription.open(consumer)
+ val subscription = new Subscription(this, consumer)
all_subscriptions += consumer -> subscription
addCapacity( tune_consumer_buffer )
}
@@ -1262,7 +1261,7 @@ class QueueEntry(val queue:Queue, val se
* tracks the entries which the consumer has acquired.
*
*/
-class Subscription(queue:Queue) extends DeliveryProducer with DispatchLogging {
+class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer with DispatchLogging {
override protected def log = Queue
def dispatchQueue = queue.dispatchQueue
@@ -1273,6 +1272,7 @@ class Subscription(queue:Queue) extends
var pos:QueueEntry = null
var acquired_size = 0L
+ def acquired_count = acquired.size()
var total_advanced_size = 0L
var advanced_size = 0
@@ -1281,6 +1281,12 @@ class Subscription(queue:Queue) extends
var best_advanced_size = queue.tune_consumer_buffer * 100
var tail_parkings = 1
+ var total_dispatched_count = 0L
+ var total_dispatched_size = 0L
+
+ var total_ack_count = 0L
+ var total_nack_count = 0L
+
override def toString = {
def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
"{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+"}"
@@ -1288,17 +1294,16 @@ class Subscription(queue:Queue) extends
def browser = session.consumer.browser
- def open(consumer: DeliveryConsumer) = {
- pos = queue.head_entry;
- session = consumer.connect(this)
- session.refiller = pos
- queue.head_entry ::= this
-
- if( queue.serviceState.isStarted ) {
- // kick off the initial dispatch.
- refill_prefetch
- queue.dispatchQueue << queue.head_entry
- }
+ // This opens up the consumer
+ pos = queue.head_entry;
+ session = consumer.connect(this)
+ session.refiller = pos
+ queue.head_entry ::= this
+
+ if( queue.serviceState.isStarted ) {
+ // kick off the initial dispatch.
+ refill_prefetch
+ queue.dispatchQueue << queue.head_entry
}
def close() = {
@@ -1358,7 +1363,15 @@ class Subscription(queue:Queue) extends
def matches(entry:Delivery) = session.consumer.matches(entry)
def full = session.full
- def offer(delivery:Delivery) = session.offer(delivery)
+ def offer(delivery:Delivery) = {
+ if( session.offer(delivery) ) {
+ total_dispatched_count += 1
+ total_dispatched_size += delivery.size
+ true
+ } else {
+ false
+ }
+ }
def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
@@ -1399,6 +1412,7 @@ class Subscription(queue:Queue) extends
if( session == null ) {
return;
}
+ total_ack_count += 1
if (entry.messageKey != -1) {
val storeBatch = if( sb == null ) {
queue.host.store.createStoreUOW
@@ -1436,6 +1450,7 @@ class Subscription(queue:Queue) extends
return;
}
+ total_nack_count += 1
entry.as_loaded.acquired = false
acquired_size -= entry.size
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java Wed Dec 8 02:49:28 2010
@@ -43,14 +43,14 @@ public class DestinationStatusDTO extend
*/
@XmlElement(name="producer")
@JsonProperty("producers")
- public List<LongIdLabeledDTO> producers = new ArrayList<LongIdLabeledDTO>();
+ public List<LinkDTO> producers = new ArrayList<LinkDTO>();
/**
* Ids of all connections that are consuming from the destination
*/
@XmlElement(name="consumer")
@JsonProperty("consumers")
- public List<LongIdLabeledDTO> consumers = new ArrayList<LongIdLabeledDTO>();
+ public List<LinkDTO> consumers = new ArrayList<LinkDTO>();
/**
* Ids of all queues that are associated with the destination
Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java?rev=1043284&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LinkDTO.java Wed Dec 8 02:49:28 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.dto;
+
+import org.codehaus.jackson.annotate.JsonTypeInfo;
+
+import javax.xml.bind.annotation.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="link")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class LinkDTO {
+
+ @XmlAttribute
+ public String kind;
+
+ @XmlAttribute
+ public String ref;
+
+ @XmlAttribute
+ public String label;
+
+
+}
\ No newline at end of file
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java (from r1043042, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java&r1=1043042&r2=1043284&rev=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueConsumerStatusDTO.java Wed Dec 8 02:49:28 2010
@@ -16,46 +16,34 @@
*/
package org.apache.activemq.apollo.dto;
-import org.codehaus.jackson.annotate.JsonProperty;
-
import javax.xml.bind.annotation.*;
-import java.util.ArrayList;
-import java.util.List;
/**
- * <p>
- * </p>
- *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="destination-status")
+@XmlRootElement(name="queue-consumer-status")
@XmlAccessorType(XmlAccessType.FIELD)
-public class DestinationStatusDTO extends LongIdDTO {
+public class QueueConsumerStatusDTO {
/**
- * The destination name
+ * link to who is consuming from the queue.
*/
- @XmlAttribute
- public String name;
+ public LinkDTO link;
- /**
- * Ids of all connections that are producing to the destination
- */
- @XmlElement(name="producer")
- @JsonProperty("producers")
- public List<LongIdLabeledDTO> producers = new ArrayList<LongIdLabeledDTO>();
+ public long position = 0;
- /**
- * Ids of all connections that are consuming from the destination
- */
- @XmlElement(name="consumer")
- @JsonProperty("consumers")
- public List<LongIdLabeledDTO> consumers = new ArrayList<LongIdLabeledDTO>();
+ public int acquired_count;
+ public long acquired_size;
+
+ public long total_dispatched_count;
+ public long total_dispatched_size;
+
+ public long total_ack_count;
+ public long total_nack_count;
/**
- * Ids of all queues that are associated with the destination
+ * What the consumer is currently waiting on
*/
- @XmlElement(name="queue")
- @JsonProperty("queues")
- public List<LongIdLabeledDTO> queues = new ArrayList<LongIdLabeledDTO>();
+ @XmlAttribute(name="waiting-on")
+ public String waiting_on;
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Wed Dec 8 02:49:28 2010
@@ -32,12 +32,6 @@ import java.util.List;
@XmlAccessorType(XmlAccessType.FIELD)
public class QueueStatusDTO extends LongIdDTO {
- /**
- * A unique id of the object within it's container
- */
- @XmlAttribute
- public long id;
-
@XmlAttribute
public String label;
@@ -91,12 +85,12 @@ public class QueueStatusDTO extends Long
* Ids of all connections that are producing to the destination
*/
@XmlElement(name="producer")
- public List<LongIdLabeledDTO> producers = new ArrayList<LongIdLabeledDTO>();
+ public List<LinkDTO> producers = new ArrayList<LinkDTO>();
/**
* Ids of all connections that are consuming from the destination
*/
@XmlElement(name="consumer")
- public List<LongIdLabeledDTO> consumers = new ArrayList<LongIdLabeledDTO>();
+ public List<QueueConsumerStatusDTO> consumers = new ArrayList<QueueConsumerStatusDTO>();
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index Wed Dec 8 02:49:28 2010
@@ -42,4 +42,6 @@ KeyStorageDTO
SimpleStoreStatusDTO
NullStoreDTO
QueueDTO
-DestinationDTO
\ No newline at end of file
+DestinationDTO
+LinkDTO
+QueueConsumerStatusDTO
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Wed Dec 8 02:49:28 2010
@@ -136,6 +136,14 @@ case class RuntimeResource(parent:Broker
rc
}
+ def link(connection:BrokerConnection) = {
+ val link = new LinkDTO()
+ link.kind = "connection"
+ link.ref = connection.id.toString
+ link.label = connection.transport.getRemoteAddress
+ link
+ }
+
@GET @Path("virtual-hosts/{id}/destinations/{dest}")
def destination(@PathParam("id") id : Long, @PathParam("dest") dest : Long):DestinationStatusDTO = {
with_virtual_host(id) { case (virtualHost,cb) =>
@@ -147,10 +155,10 @@ case class RuntimeResource(parent:Broker
result.queues.add(new LongIdLabeledDTO(q.id, q.binding.label))
}
node.broadcast_consumers.flatMap( _.connection ).foreach { connection=>
- result.consumers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+ result.consumers.add(link(connection))
}
node.broadcast_producers.flatMap( _.producer.connection ).foreach { connection=>
- result.producers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+ result.producers.add(link(connection))
}
result
@@ -162,9 +170,9 @@ case class RuntimeResource(parent:Broker
def queue(@PathParam("id") id : Long, @PathParam("dest") dest : Long, @PathParam("queue") qid : Long, @QueryParam("entries") entries:Boolean ):QueueStatusDTO = {
with_virtual_host(id) { case (virtualHost,cb) =>
import JavaConversions._
- val rc = virtualHost.router.routing_nodes.find { _.id == dest } flatMap { node=>
- node.queues.find { _.id == qid } map { q=>
-
+ (virtualHost.router.routing_nodes.find { _.id == dest } flatMap { node=> node.queues.find { _.id == qid } }) match {
+ case None=> cb(None)
+ case Some(q) => q.dispatchQueue {
val result = new QueueStatusDTO
result.id = q.id
result.label = q.binding.label
@@ -208,16 +216,31 @@ case class RuntimeResource(parent:Broker
}
q.inbound_sessions.flatMap( _.producer.connection ).foreach { connection=>
- result.producers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+ result.producers.add(link(connection))
}
- q.all_subscriptions.keysIterator.toSeq.flatMap( _.connection ).foreach { connection=>
- result.consumers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+ q.all_subscriptions.valuesIterator.toSeq.foreach{ sub =>
+ val status = new QueueConsumerStatusDTO
+ sub.consumer.connection.foreach(x=> status.link = link(x))
+ status.total_dispatched_count = sub.total_dispatched_count
+ status.total_dispatched_size = sub.total_dispatched_size
+ status.total_ack_count = sub.total_ack_count
+ status.total_nack_count = sub.total_nack_count
+ status.acquired_size = sub.acquired_size
+ status.acquired_count = sub.acquired_count
+ status.waiting_on = if( sub.full ) {
+ "ack"
+ } else if( !sub.pos.is_loaded ) {
+ "load"
+ } else if( !sub.pos.is_tail ) {
+ "producer"
+ } else {
+ "dispatch"
+ }
+ result.consumers.add(status)
}
-
- result
+ cb(Some(result))
}
}
- cb(rc)
}
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade Wed Dec 8 02:49:28 2010
@@ -31,13 +31,19 @@ ul
h3 Broadcast Producers
ul
- for( x <- producers )
- li
- a(href={ path("../../../../connections/"+x.id) }) #{x.label}
+ - x.kind match
+ - case "connection" =>
+ li
+ a(href={ path("../../../../connections/"+x.ref) }) #{x.label}
+ - case _ =>
h3 Broadcast Consumers
ul
- for( x <- consumers )
- li
- a(href={ path("../../../../connections/"+x.id) }) #{x.label}
+ - x.kind match
+ - case "connection" =>
+ li
+ a(href={ path("../../../../connections/"+x.ref) }) #{x.label}
+ - case _ =>
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1043284&r1=1043283&r2=1043284&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Wed Dec 8 02:49:28 2010
@@ -48,15 +48,27 @@ p holding : #{flushed_items} flushed mes
h3 Producers
ul
- for( x <- producers )
- li
- a(href={ path("../../../../../../connections/"+x.id) }) #{x.label}
+ - x.kind match
+ - case "connection" =>
+ li.producer
+ a(href={ path("../../../../../../connections/"+x.ref) }) #{x.label}
+ - case _ =>
+
h3 Consumers
ul
- - for( x <- consumers )
- li
- a(href={ path("../../../../../../connections/"+x.id) }) #{x.label}
+ - for( consumer <- consumers )
+ - import consumer._
+ li.consumer
+ - if( link !=null )
+ a(href={ path("../../../../../../connections/"+link.ref ) }) #{link.label}
+ p next message seq: #{position}
+ p acquired: #{acquired_count} messages (#{memory(acquired_size)})
+ p dispatched: #{total_dispatched_count} messages (#{memory(total_dispatched_size)})
+ p acks: #{total_ack_count} messages
+ p naks: #{total_nack_count} messages
+ p waiting on: #{waiting_on}
- if ( entries.isEmpty )
h2