You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/20 18:52:58 UTC
svn commit: r965902 - in /activemq/sandbox/activemq-apollo-actor:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apo...
Author: chirino
Date: Tue Jul 20 16:52:58 2010
New Revision: 965902
URL: http://svn.apache.org/viewvc?rev=965902&view=rev
Log:
web console now shows the consumers/producers on a destination or queue
Modified:
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml
activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Binding.scala Tue Jul 20 16:52:58 2010
@@ -195,11 +195,11 @@ class DurableSubBinding(val binding_data
def unbind(node: RoutingNode, queue: Queue) = {
- node.add_broadcast_consumer(queue)
+ node.remove_broadcast_consumer(queue)
}
def bind(node: RoutingNode, queue: Queue) = {
- node.remove_broadcast_consumer(queue)
+ node.add_broadcast_consumer(queue)
}
def label = {
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Tue Jul 20 16:52:58 2010
@@ -32,6 +32,8 @@ trait DeliveryProducer {
def dispatchQueue:DispatchQueue
+ def connection:Option[BrokerConnection] = None
+
def collocate(value:DispatchQueue):Unit = {
if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
@@ -47,6 +49,9 @@ trait DeliveryProducer {
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait DeliveryConsumer extends Retained {
+
+ def connection:Option[BrokerConnection] = None
+
def browser = false
def dispatchQueue:DispatchQueue;
def matches(message:Delivery):Boolean
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Jul 20 16:52:58 2010
@@ -41,6 +41,7 @@ object Queue extends Log {
class Queue(val host: VirtualHost, var id:Long, val binding:Binding) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
override protected def log = Queue
+ var inbound_sessions = Set[DeliverySession]()
var all_subscriptions = Map[DeliveryConsumer, Subscription]()
var fast_subscriptions = List[Subscription]()
@@ -202,7 +203,8 @@ class Queue(val host: VirtualHost, var i
}
protected def _stop(onCompleted: Runnable) = {
- throw new AssertionError("Not implemented.");
+ // TODO: perhaps we should remove all the entries
+ onCompleted.run
}
def addCapacity(amount:Int) = {
@@ -443,7 +445,10 @@ class Queue(val host: VirtualHost, var i
}
messages.refiller.run
}
+
+
+
/////////////////////////////////////////////////////////////////////
//
// Implementation of the DeliveryConsumer trait. Allows this queue
@@ -456,20 +461,23 @@ class Queue(val host: VirtualHost, var i
def connect(p: DeliveryProducer) = new DeliverySession {
retain
- dispatchQueue {
- addCapacity( tune_producer_buffer )
- }
-
override def consumer = Queue.this
override def producer = p
val session = session_manager.open(producer.dispatchQueue)
+ dispatchQueue {
+ inbound_sessions += this
+ addCapacity( tune_producer_buffer )
+ }
+
+
def close = {
session_manager.close(session)
dispatchQueue {
addCapacity( -tune_producer_buffer )
+ inbound_sessions -= this
}
release
}
@@ -1358,7 +1366,9 @@ class Subscription(queue:Queue) extends
def advance(value:QueueEntry):Unit = {
assert(value!=null)
- assert(pos!=null)
+ if( pos == null ) {
+ assert(pos!=null)
+ }
advanced_size += pos.size
@@ -1397,8 +1407,11 @@ class Subscription(queue:Queue) extends
acquired.addLast(this)
acquired_size += entry.size
- def ack(sb:StoreUOW) = {
-
+ def ack(sb:StoreUOW):Unit = {
+ // The session may have already been closed..
+ if( session == null ) {
+ return;
+ }
if (entry.messageKey != -1) {
val storeBatch = if( sb == null ) {
queue.host.store.createStoreUOW
@@ -1430,7 +1443,11 @@ class Subscription(queue:Queue) extends
next.run
}
- def nack = {
+ def nack:Unit = {
+ // The session may have already been closed..
+ if( session == null ) {
+ return;
+ }
entry.as_loaded.acquired = false
acquired_size -= entry.size
Modified: activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Tue Jul 20 16:52:58 2010
@@ -188,30 +188,19 @@ class VirtualHost(val broker: Broker, va
override protected def _stop(onCompleted:Runnable):Unit = {
-// TODO:
-// val tmp = new ArrayList[Queue](queues.values())
-// for (queue <- tmp) {
-// queue.shutdown
-// }
-
-// TODO:
-// ArrayList<IQueue<Long, MessageDelivery>> durableQueues = new ArrayList<IQueue<Long,MessageDelivery>>(queueStore.getDurableQueues());
-// done = new RunnableCountDownLatch(durableQueues.size());
-// for (IQueue<Long, MessageDelivery> queue : durableQueues) {
-// queue.shutdown(done);
-// }
-// done.await();
-
+ val tracker = new LoggingTracker("virtual host shutdown", dispatchQueue)
+ router.queues.valuesIterator.foreach { queue=>
+ tracker.stop(queue)
+ }
if( direct_buffer_pool!=null ) {
direct_buffer_pool.stop
direct_buffer_pool = null
}
if( store!=null ) {
- store.stop(onCompleted);
- } else {
- onCompleted.run
+ tracker.stop(store);
}
+ tracker.callback(onCompleted)
}
Modified: activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java Tue Jul 20 16:52:58 2010
@@ -87,4 +87,16 @@ public class QueueStatusDTO extends Long
public List<EntryStatusDTO> entries = new ArrayList<EntryStatusDTO>();
+ /**
+ * Ids of all connections that are producing to the destination
+ */
+ @XmlElement(name="producer")
+ public List<LongIdLabeledDTO> producers = new ArrayList<LongIdLabeledDTO>();
+
+ /**
+ * Ids of all connections that are consuming from the destination
+ */
+ @XmlElement(name="consumer")
+ public List<LongIdLabeledDTO> consumers = new ArrayList<LongIdLabeledDTO>();
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Tue Jul 20 16:52:58 2010
@@ -140,6 +140,8 @@ class StompProtocolHandler extends Proto
dispatchQueue.release
})
+ override def connection = Some(StompProtocolHandler.this.connection)
+
def matches(delivery:Delivery) = {
if( delivery.message.protocol eq StompProtocol ) {
if( selector!=null ) {
@@ -306,6 +308,9 @@ class StompProtocolHandler extends Proto
// create the producer route...
val producer = new DeliveryProducer() {
+
+ override def connection = Some( StompProtocolHandler.this.connection )
+
override def dispatchQueue = queue
}
Modified: activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Tue Jul 20 16:52:58 2010
@@ -54,6 +54,7 @@ object StompLoadClient {
var headers = List[String]()
var ack = "auto"
var selector:String = null
+ var durable = false
var destinationType = "queue"
var destinationCount = 1
@@ -161,6 +162,7 @@ object StompLoadClient {
"consumerSleep = "+consumerSleep+"\n"+
"ack = "+ack+"\n"+
"selector = "+selector+"\n"+
+ "durable = "+durable+"\n"+
""
}
@@ -352,15 +354,14 @@ object StompLoadClient {
while (!done.get) {
connect {
val headers = Map[AsciiBuffer, AsciiBuffer]()
- client.send("""
-SUBSCRIBE""" + (if(selector==null) {""} else {
-"""
-selector: """+selector
-}) + """
-ack:"""+ack+"""
-destination:"""+destination(id)+"""
+ client.send(
+ "SUBSCRIBE\n" +
+ (if(!durable) {""} else {"id:durable:mysub-"+id+"\n"}) +
+ (if(selector==null) {""} else {"selector: "+selector+"\n"}) +
+ "ack:"+ack+"\n"+
+ "destination:"+destination(id)+"\n"+
+ "\n")
-""")
client.flush
receiveLoop
}
Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Tue Jul 20 16:52:58 2010
@@ -147,6 +147,13 @@ case class RuntimeResource(parent:Broker
node.queues.foreach { q=>
result.queues.add(new LongIdLabeledDTO(q.id, q.binding.label))
}
+ node.broadcast_consumers.flatMap( _.connection ).foreach { connection=>
+ result.consumers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+ }
+ node.broadcast_producers.flatMap( _.producer.connection ).foreach { connection=>
+ result.producers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+ }
+
result
})
}
@@ -200,6 +207,14 @@ case class RuntimeResource(parent:Broker
}
}
}
+
+ q.inbound_sessions.flatMap( _.producer.connection ).foreach { connection=>
+ result.producers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+ }
+ q.all_subscriptions.keysIterator.toSeq.flatMap( _.connection ).foreach { connection=>
+ result.consumers.add(new LongIdLabeledDTO(connection.id, connection.transport.getRemoteAddress))
+ }
+
result
}
}
Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.scaml Tue Jul 20 16:52:58 2010
@@ -25,13 +25,13 @@
%li
%a(href={ path("queues/"+x.id) }) #{x.label}
-%h2 Producers
+%h3 Broadcast Producers
%ul
- for( x <- producers )
%li
%a(href={ path("../../../../connections/"+x.id) }) #{x.label}
-%h2 Consumers
+%h3 Broadcast Consumers
%ul
- for( x <- consumers )
%li
Modified: activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml?rev=965902&r1=965901&r2=965902&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml (original)
+++ activemq/sandbox/activemq-apollo-actor/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.scaml Tue Jul 20 16:52:58 2010
@@ -42,6 +42,19 @@
%p flushing out of memory: #{memory(flushing_size)}
%p holding : #{flushed_items} flushed message references
+%h3 Producers
+%ul
+ - for( x <- producers )
+ %li
+ %a(href={ path("../../../../../../connections/"+x.id) }) #{x.label}
+
+%h3 Consumers
+%ul
+ - for( x <- consumers )
+ %li
+ %a(href={ path("../../../../../../connections/"+x.id) }) #{x.label}
+
+
- if ( entries.isEmpty )
%h2
Entries Dump