You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/20 18:52:58 UTC

svn commit: r965902 - in /activemq/sandbox/activemq-apollo-actor: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apo...

Author: chirino
Date: Tue Jul 20 16:52:58 2010
New Revision: 965902

URL: http://svn.apache.org/viewvc?rev=965902&view=rev
Log:
web console now shows the consumers/producers on a destination or queue

Modified:
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
    activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
    activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
    activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml
    activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Tue Jul 20 16:52:58 2010
@@ -195,11 +195,11 @@ class DurableSubBinding(val binding_data
 
 
   def unbind(node: RoutingNode, queue: Queue) = {
-    node.add_broadcast_consumer(queue)
+    node.remove_broadcast_consumer(queue)
   }
 
   def bind(node: RoutingNode, queue: Queue) = {
-    node.remove_broadcast_consumer(queue)
+    node.add_broadcast_consumer(queue)
   }
 
   def label = {

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Tue Jul 20 16:52:58 2010
@@ -32,6 +32,8 @@ trait DeliveryProducer {
 
   def dispatchQueue:DispatchQueue
 
+  def connection:Option[BrokerConnection] = None
+
   def collocate(value:DispatchQueue):Unit = {
     if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
       println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
@@ -47,6 +49,9 @@ trait DeliveryProducer {
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait DeliveryConsumer extends Retained {
+
+  def connection:Option[BrokerConnection] = None
+
   def browser = false
   def dispatchQueue:DispatchQueue;
   def matches(message:Delivery):Boolean

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Jul 20 16:52:58 2010
@@ -41,6 +41,7 @@ object Queue extends Log {
 class Queue(val host: VirtualHost, var id:Long, val binding:Binding) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
   override protected def log = Queue
 
+  var inbound_sessions = Set[DeliverySession]()
   var all_subscriptions = Map[DeliveryConsumer, Subscription]()
   var fast_subscriptions = List[Subscription]()
 
@@ -202,7 +203,8 @@ class Queue(val host: VirtualHost, var i
   }
 
   protected def _stop(onCompleted: Runnable) = {
-    throw new AssertionError("Not implemented.");
+    // TODO: perhaps we should remove all the entries
+    onCompleted.run
   }
 
   def addCapacity(amount:Int) = {
@@ -443,7 +445,10 @@ class Queue(val host: VirtualHost, var i
     }
     messages.refiller.run
   }
+
+
   
+
   /////////////////////////////////////////////////////////////////////
   //
   // Implementation of the DeliveryConsumer trait.  Allows this queue
@@ -456,20 +461,23 @@ class Queue(val host: VirtualHost, var i
   def connect(p: DeliveryProducer) = new DeliverySession {
     retain
 
-    dispatchQueue {
-      addCapacity( tune_producer_buffer )
-    }
-
     override def consumer = Queue.this
 
     override def producer = p
 
     val session = session_manager.open(producer.dispatchQueue)
 
+    dispatchQueue {
+      inbound_sessions += this
+      addCapacity( tune_producer_buffer )
+    }
+
+
     def close = {
       session_manager.close(session)
       dispatchQueue {
         addCapacity( -tune_producer_buffer )
+        inbound_sessions -= this
       }
       release
     }
@@ -1358,7 +1366,9 @@ class Subscription(queue:Queue) extends 
   def advance(value:QueueEntry):Unit = {
 
     assert(value!=null)
-    assert(pos!=null)
+    if( pos == null ) {
+      assert(pos!=null)
+    }
 
     advanced_size += pos.size
 
@@ -1397,8 +1407,11 @@ class Subscription(queue:Queue) extends 
     acquired.addLast(this)
     acquired_size += entry.size
 
-    def ack(sb:StoreUOW) = {
-
+    def ack(sb:StoreUOW):Unit = {
+      // The session may have already been closed..
+      if( session == null ) {
+        return;
+      }
       if (entry.messageKey != -1) {
         val storeBatch = if( sb == null ) {
           queue.host.store.createStoreUOW
@@ -1430,7 +1443,11 @@ class Subscription(queue:Queue) extends 
       next.run
     }
 
-    def nack = {
+    def nack:Unit = {
+      // The session may have already been closed..
+      if( session == null ) {
+        return;
+      }
 
       entry.as_loaded.acquired = false
       acquired_size -= entry.size

Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Tue Jul 20 16:52:58 2010
@@ -188,30 +188,19 @@ class VirtualHost(val broker: Broker, va
 
   override protected def _stop(onCompleted:Runnable):Unit = {
 
-//    TODO:
-//      val tmp = new ArrayList[Queue](queues.values())
-//      for (queue <-  tmp) {
-//        queue.shutdown
-//      }
-
-// TODO:
-//        ArrayList<IQueue<Long, MessageDelivery>> durableQueues = new ArrayList<IQueue<Long,MessageDelivery>>(queueStore.getDurableQueues());
-//        done = new RunnableCountDownLatch(durableQueues.size());
-//        for (IQueue<Long, MessageDelivery> queue : durableQueues) {
-//            queue.shutdown(done);
-//        }
-//        done.await();
-
+    val tracker = new LoggingTracker("virtual host shutdown", dispatchQueue)
+    router.queues.valuesIterator.foreach { queue=>
+      tracker.stop(queue)
+    }
     if( direct_buffer_pool!=null ) {
       direct_buffer_pool.stop
       direct_buffer_pool = null
     }
 
     if( store!=null ) {
-      store.stop(onCompleted);
-    } else {
-      onCompleted.run
+      tracker.stop(store);
     }
+    tracker.callback(onCompleted)
   }
 
 

Modified: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Tue Jul 20 16:52:58 2010
@@ -87,4 +87,16 @@ public class QueueStatusDTO extends Long
     public List<EntryStatusDTO> entries = new ArrayList<EntryStatusDTO>();
 
 
+    /**
+     * Ids of all connections that are producing to the destination
+     */
+    @XmlElement(name="producer")
+    public List<LongIdLabeledDTO> producers = new ArrayList<LongIdLabeledDTO>();
+
+    /**
+     * Ids of all connections that are consuming from the destination
+     */
+    @XmlElement(name="consumer")
+    public List<LongIdLabeledDTO> consumers = new ArrayList<LongIdLabeledDTO>();
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Tue Jul 20 16:52:58 2010
@@ -140,6 +140,8 @@ class StompProtocolHandler extends Proto
       dispatchQueue.release
     })
 
+    override def connection = Some(StompProtocolHandler.this.connection) 
+
     def matches(delivery:Delivery) = {
       if( delivery.message.protocol eq StompProtocol ) {
         if( selector!=null ) {
@@ -306,6 +308,9 @@ class StompProtocolHandler extends Proto
             // create the producer route...
 
             val producer = new DeliveryProducer() {
+
+              override def connection = Some( StompProtocolHandler.this.connection )
+
               override def dispatchQueue = queue
             }
 

Modified: activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Tue Jul 20 16:52:58 2010
@@ -54,6 +54,7 @@ object StompLoadClient {
   var headers = List[String]()
   var ack = "auto"
   var selector:String = null
+  var durable = false
 
   var destinationType = "queue"
   var destinationCount = 1
@@ -161,6 +162,7 @@ object StompLoadClient {
     "consumerSleep    = "+consumerSleep+"\n"+
     "ack              = "+ack+"\n"+
     "selector         = "+selector+"\n"+
+    "durable          = "+durable+"\n"+
     ""
 
   }
@@ -352,15 +354,14 @@ object StompLoadClient {
       while (!done.get) {
         connect {
           val headers = Map[AsciiBuffer, AsciiBuffer]()
-          client.send("""
-SUBSCRIBE""" + (if(selector==null) {""} else {
-"""
-selector: """+selector
-}) + """
-ack:"""+ack+"""
-destination:"""+destination(id)+"""
+          client.send(
+            "SUBSCRIBE\n" +
+             (if(!durable) {""} else {"id:durable:mysub-"+id+"\n"}) + 
+             (if(selector==null) {""} else {"selector: "+selector+"\n"}) +
+             "ack:"+ack+"\n"+
+             "destination:"+destination(id)+"\n"+
+             "\n")
 
-""")
           client.flush
           receiveLoop
         }

Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Tue Jul 20 16:52:58 2010
@@ -147,6 +147,13 @@ case class RuntimeResource(parent:Broker
         node.queues.foreach { q=>
           result.queues.add(new LongIdLabeledDTO(q.id, q.binding.label))
         }
+        node.broadcast_consumers.flatMap( _.connection ).foreach { connection=>
+          result.consumers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+        }
+        node.broadcast_producers.flatMap( _.producer.connection ).foreach { connection=>
+          result.producers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+        }
+
         result
       })
     }
@@ -200,6 +207,14 @@ case class RuntimeResource(parent:Broker
               }
             }
           }
+
+          q.inbound_sessions.flatMap( _.producer.connection ).foreach { connection=>
+            result.producers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+          }
+          q.all_subscriptions.keysIterator.toSeq.flatMap( _.connection ).foreach { connection=>
+            result.consumers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+          }
+
           result
         }
       }

Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml Tue Jul 20 16:52:58 2010
@@ -25,13 +25,13 @@
     %li
       %a(href={ path("queues/"+x.id) }) #{x.label}
 
-%h2 Producers
+%h3 Broadcast Producers
 %ul
   - for( x <- producers )
     %li
       %a(href={ path("../../../../connections/"+x.id) }) #{x.label}
 
-%h2 Consumers
+%h3 Broadcast Consumers
 %ul
   - for( x <- consumers )
     %li

Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml Tue Jul 20 16:52:58 2010
@@ -42,6 +42,19 @@
 %p flushing out of memory: #{memory(flushing_size)}
 %p holding : #{flushed_items} flushed message references
 
+%h3 Producers
+%ul
+  - for( x <- producers )
+    %li
+      %a(href={ path("../../../../../../connections/"+x.id) }) #{x.label}
+
+%h3 Consumers
+%ul
+  - for( x <- consumers )
+    %li
+      %a(href={ path("../../../../../../connections/"+x.id) }) #{x.label}
+
+
 - if ( entries.isEmpty )
   %h2
     Entries Dump