You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/08 21:36:07 UTC

svn commit: r1480418 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protoc...

Author: chirino
Date: Wed May  8 19:36:06 2013
New Revision: 1480418

URL: http://svn.apache.org/r1480418
Log:
Adding support for a query parameter to include detailed debug info about a connection's state.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Wed May  8 19:36:06 2013
@@ -124,7 +124,7 @@ class AmqpProtocolHandler extends Protoc
   var messages_sent = 0L
   var messages_received = 0L
 
-  override def create_connection_status = {
+  override def create_connection_status(debug:Boolean) = {
     var rc = new AmqpConnectionStatusDTO
     rc.protocol_version = "1.0.0"
     rc.user = security_context.user

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed May  8 19:36:06 2013
@@ -150,11 +150,11 @@ class BrokerConnection(var connector: Co
 
   protected override def on_transport_failure(error: IOException) = protocol_handler.on_transport_failure(error)
 
-  def get_connection_status = {
+  def get_connection_status(debug:Boolean=false) = {
     val result = if( protocol_handler==null ) {
       new ConnectionStatusDTO
     } else {
-      protocol_handler.create_connection_status
+      protocol_handler.create_connection_status(debug)
     }
 
     result.id = id.toString

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed May  8 19:36:06 2013
@@ -157,7 +157,7 @@ class AcceptingConnector(val broker:Brok
 
     for( (id, connection) <- broker.connections if connection.connector eq this ) {
       result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress.toString ) )
-      val status = connection.get_connection_status
+      val status = connection.get_connection_status(false)
       if( status!=null ) {
         result.messages_sent += status.messages_sent
         result.messages_received += status.messages_received
@@ -328,7 +328,7 @@ class AcceptingConnector(val broker:Brok
     val at_limit = at_connection_limit
     if( broker.connections.remove(connection.id).isDefined ) {
       connected.decrementAndGet()
-      val status = connection.get_connection_status
+      val status = connection.get_connection_status(false)
       if( status!=null ) {
         dead_messages_sent += status.messages_sent
         dead_messages_received += status.messages_received

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed May  8 19:36:06 2013
@@ -270,4 +270,19 @@ class Delivery {
 
   def redelivered = redeliveries = ((redeliveries+1).min(Short.MaxValue)).toShort
 
+  override def toString = {
+    "Delivery(" +
+      "sender:"+sender+", "+
+      "size:"+size+", "+
+      "message codec:"+message.codec.id+", "+
+      "expiration:"+expiration+", "+
+      "persistent:"+persistent+", "+
+      "redeliveries:"+redeliveries+", "+
+      "seq:"+seq+", "+
+      "storeKey:"+storeKey+", "+
+      "storeLocator:"+storeLocator+", "+
+      "uow:"+uow+", "+
+      "ack:"+(ack!=null)+
+    ")"
+  }
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed May  8 19:36:06 2013
@@ -1219,7 +1219,15 @@ class Queue(val router: LocalRouter, val
   class QueueDeliverySession(val producer: DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery]{
     retain
 
-    override def toString = Queue.this.toString
+    def odlToString = Queue.this.toString
+    override def toString = {
+      "QueueDeliverySession("+
+        "queue: "+Queue.this.id +
+        ", full:"+full+
+        ", "+downstream+
+      ")"
+    }
+
     override def consumer = Queue.this
 
     val downstream = session_manager.open(producer.dispatch_queue)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed May  8 19:36:06 2013
@@ -414,5 +414,14 @@ abstract class DeliveryProducerRoute(rou
     }
   }
 
+  override def toString = {
+    "last_send: "+last_send+
+    ", retained: "+reained_base.retained()+
+    ", is_connected: "+is_connected+
+    ", dispatch_delivery: "+dispatch_delivery+
+    ", dispatch_sessions: "+dispatch_sessions.size+
+    ", "+super.toString
+    ", targets: "+targets
+  }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed May  8 19:36:06 2013
@@ -56,9 +56,13 @@ abstract class Sink[T] {
   def map[Y](func: Y=>T ):Sink[Y] = new SinkMapper[Y,T] {
     def passing(value: Y) = func(value)
     def downstream = Sink.this
+    override def toString: String = downstream.toString
   }
 
   def flatMap[Y](func: Y=>Option[T]):Sink[Y] = new Sink[Y] with SinkFilter[T] {
+
+    override def toString: String = downstream.toString
+
     def downstream = Sink.this
     def offer(value:Y) = {
       if( full ) {
@@ -152,6 +156,7 @@ class TransportSink(val transport:Transp
   var refiller:Task = NOOP
   def full:Boolean = transport.full
   def offer(value:AnyRef) =  transport.offer(value)
+  override def toString: String = "TransportSink(full:"+full+")"
 }
 
 /**
@@ -161,7 +166,14 @@ class TransportSink(val transport:Transp
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class OverflowSink[T](val downstream:Sink[T]) extends AbstractOverflowSink[T]
+class OverflowSink[T](val downstream:Sink[T]) extends AbstractOverflowSink[T] {
+  override def toString: String = {
+    "OverflowSink("+
+      super.toString+
+      ", "+downstream+
+    ")"
+  }
+}
 
 abstract class AbstractOverflowSink[T] extends Sink[T] {
 
@@ -171,6 +183,11 @@ abstract class AbstractOverflowSink[T] e
 
   val overflow = new LinkedList[T]()
 
+  override def toString = {
+    "overflow: "+overflow
+    ", full: "+full
+  }
+
   def overflowed = !overflow.isEmpty
 
   def full = overflowed || downstream.full
@@ -229,7 +246,6 @@ abstract class AbstractOverflowSink[T] e
    */
   protected def onDelivered(value:T) = {
   }
-
 }
 
 
@@ -279,6 +295,15 @@ class SinkMux[T](val downstream:Sink[T])
 
   class ManagedSink extends Sink[T] {
 
+
+    override def toString: String = {
+      "ManagedSink("+
+        "rejection_handler: "+(rejection_handler)+
+        ", full: "+(full)+
+        ", "+downstream+
+      ")"
+    }
+
     var rejection_handler:(T)=>Unit = _
     var refiller:Task = NOOP
 
@@ -321,12 +346,23 @@ class CreditWindowFilter[T](val downstre
 
   var delivery_credits = 0
   var byte_credits = 0
-  var disabled = true
+  var enabled = true
+
 
-  override def full: Boolean = downstream.full || ( disabled && byte_credits <= 0 && delivery_credits <= 0 )
+  override def toString: String = {
+    "CreditWindowFilter("+
+     "enabled:"+enabled+
+      ", delivery_credits:"+delivery_credits+
+      ", byte_credits:"+byte_credits+
+      ", full:"+full+
+      ", "+downstream+
+    ")"
+  }
+
+  override def full: Boolean = downstream.full || ( enabled && byte_credits <= 0 && delivery_credits <= 0 )
 
   def disable = {
-    disabled = false
+    enabled = false
     refiller.run()
   }
 
@@ -400,6 +436,14 @@ class SessionSinkMux[T](val downstream:S
   var sessions = HashSet[Session[T]]()
   var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
 
+  override def toString: String = {
+    "SessionSinkMux(" +
+      "sessions: "+sessions.size+
+      ", overflowed_sessions: "+overflowed_sessions.size+
+      ", "+downstream+
+    ")"
+  }
+
   def open(producer_queue:DispatchQueue):SessionSink[T] = {
     val session = new Session[T](this, producer_queue)
     consumer_queue <<| ^{
@@ -560,6 +604,21 @@ class Session[T](mux:SessionSinkMux[T], 
   @volatile
   var enqueue_ts = mux.time_stamp
 
+
+  override def toString: String = {
+    "Session("+
+      "enqueue_item_counter:"+enqueue_item_counter+
+      ", enqueue_size_counter:"+enqueue_size_counter+
+      ", delivery_credits:"+delivery_credits+
+      ", size_credits:"+size_credits+
+      ", size_credits:"+size_credits+
+      ", overflow:"+overflow.size()+
+      ", stall_counter:"+stall_counter+
+      ", size_bonus:"+size_bonus +
+      ", full:"+full+
+    ")"
+  }
+
   def credit(delivery_credits:Int, size_credits:Int) = {
     if( delivery_credits!=0 || size_credits!=0 ) {
       credit_adder.merge((delivery_credits, size_credits))

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed May  8 19:36:06 2013
@@ -73,17 +73,20 @@ class Topic(val router:LocalRouter, val 
     def matches(message: Delivery) = true
     def is_persistent = false
     def dispatch_queue = null
-    def connect(producer: DeliveryProducer) = ProxyProducerSession(producer)
+    def connect(producer: DeliveryProducer) = new ProxyProducerSession(producer)
 
   }
 
 
-  case class ProxyProducerSession(val producer:DeliveryProducer) extends DeliverySession {
+  class ProxyProducerSession(val producer:DeliveryProducer) extends DeliverySession {
+
 
     dispatch_queue {
       proxy_sessions.add(this)
     }
 
+    override def toString: String = "ProxyProducerSession(topic="+id+")"
+
     def remaining_capacity = 1
     var enqueue_ts = 0L
     var enqueue_size_counter = 0L

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Wed May  8 19:36:06 2013
@@ -93,7 +93,7 @@ trait ProtocolHandler {
     this.connection = brokerConnection
   }
 
-  def create_connection_status = new ConnectionStatusDTO
+  def create_connection_status(debug:Boolean) = new ConnectionStatusDTO
 
   def on_transport_failure(error:IOException) = {
     trace(error)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Wed May  8 19:36:06 2013
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 import org.fusesource.hawtdispatch._
 import java.nio.channels.{DatagramChannel, WritableByteChannel, ReadableByteChannel}
 import java.net.SocketAddress
-import org.apache.activemq.apollo.dto.{ProtocolDTO, UdpDTO, AcceptingConnectorDTO}
+import org.apache.activemq.apollo.dto.{ConnectionStatusDTO, ProtocolDTO, UdpDTO, AcceptingConnectorDTO}
 import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
 import java.util.Map.Entry
 import org.apache.activemq.apollo.util._
@@ -135,8 +135,8 @@ abstract class UdpProtocolHandler extend
   def broker = connection.connector.broker
   def queue = connection.dispatch_queue
 
-  override def create_connection_status = {
-    var rc = super.create_connection_status
+  override def create_connection_status(debug:Boolean) = {
+    var rc = new ConnectionStatusDTO
     rc.waiting_on = waiting_on
     rc.messages_received = messages_received
     rc

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Wed May  8 19:36:06 2013
@@ -219,7 +219,7 @@ class BrokerFunSuiteSupport extends FunS
     broker.dispatch_queue {
       info(" -- Connections -- ")
       for(connection <- broker.connections.values) {
-        info(json(connection.get_connection_status))
+        info(json(connection.get_connection_status(false)))
       }
 
       val router = broker.default_virtual_host.local_router

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java Wed May  8 19:36:06 2013
@@ -127,4 +127,11 @@ public class ConnectionStatusDTO extends
      */
     @XmlAttribute(name="subscription_count")
 	public int subscription_count;
+
+    /**
+     * Holds detailed state data used to debug connections.
+     */
+	@XmlAttribute(name="debug")
+	public String debug;
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java Wed May  8 19:36:06 2013
@@ -517,7 +517,7 @@ public class MqttProtocolHandler extends
     LongCounter messages_received = new LongCounter(0);
     int subscription_count = 0;
 
-    public MqttConnectionStatusDTO create_connection_status() {
+    public MqttConnectionStatusDTO create_connection_status(boolean debug) {
         MqttConnectionStatusDTO rc = new MqttConnectionStatusDTO();
         rc.protocol_version = "3.1";
         rc.messages_sent = messages_sent.get();

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java Wed May  8 19:36:06 2013
@@ -562,7 +562,7 @@ public class MqttSession {
             if (route.full()) {
                 // but once it gets full.. suspend to flow control the producer.
                 route.suspended = true;
-                handler._suspend_read("blocked sending to: " + route.dispatch_sessions().mkString(", "));
+                handler._suspend_read("blocked sending to: " + route.address);
             }
 
         } else {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed May  8 19:36:06 2013
@@ -128,7 +128,7 @@ class OpenwireProtocolHandler extends Pr
   var messages_received = 0L
   val preferred_wireformat_settings = new WireFormatInfo();
 
-  override def create_connection_status = {
+  override def create_connection_status(debug:Boolean) = {
     var rc = new OpenwireConnectionStatusDTO
     rc.protocol_version = ""+(if (wire_format == null) 0 else wire_format.getVersion)
     rc.user = login.map(_.toString).getOrElse(null)
@@ -712,7 +712,7 @@ class OpenwireProtocolHandler extends Pr
       // but once it gets full.. suspend, so that we get more messages
       // until it's not full anymore.
       route.suspended = true
-      suspend_read("blocked destination: "+route.dispatch_sessions.mkString(", "))
+      suspend_read("blocked sending to: "+route.addresses.mkString(", ").mkString(", "))
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed May  8 19:36:06 2013
@@ -238,7 +238,26 @@ class StompProtocolHandler extends Proto
 //    }
 
 
-    override def toString = "stomp subscription"+subscription_id.map(" id: "+_+",").getOrElse("")+" remote address: "+security_context.remote_address
+    override def toString ={
+      // Lets setup some helpers
+      def b( x:Boolean, y: =>String):String  = if(x) y else ""
+      def h( x:AnyRef, y: =>String):String  = if(x!=null) y else ""
+      def o[T]( x:Option[T], y: =>String):String = if(x.isDefined) y else ""
+
+      "StompConsumer("+
+        o(subscription_id, "subscription_id: "+subscription_id.get+", ")+
+        "addresses: "+addresses.mkString(",")+", "+
+        "ack_mode: "+ack_mode+", "+
+        initial_credit_window+
+        h(selector, ", selector: "+selector._1)+
+        b(browser, ", browser")+
+        b(close_on_drain, ", close_on_drain")+
+        b(exclusive, ", exclusive")+
+        b(from_seq!=0, ", from_seq: "+from_seq)+
+        o(include_seq, ", include_seq: "+include_seq)+
+        ", "+session_manager+
+      ")"
+    }
 
     override def start_from_tail = from_seq == -1
 
@@ -597,7 +616,13 @@ class StompProtocolHandler extends Proto
 
       val downstream = session_manager.open(producer.dispatch_queue)
 
-      override def toString = "connection to "+StompProtocolHandler.this.connection.transport.getRemoteAddress
+      override def toString = {
+        "StompConsumerSession("+
+          "connection to: "+StompProtocolHandler.this.connection.transport.getRemoteAddress+", "
+          "closed: "+closed+", "
+          "downstream: "+downstream
+        ")"
+      }
 
       def consumer = StompConsumer.this
       var closed = false
@@ -768,7 +793,7 @@ class StompProtocolHandler extends Proto
   var messages_sent = 0L
   var messages_received = 0L
 
-  override def create_connection_status = {
+  override def create_connection_status(debug:Boolean) = {
     var rc = new StompConnectionStatusDTO
     rc.protocol_version = if( protocol_version == null ) null else protocol_version.toString
     rc.user = security_context.user
@@ -776,6 +801,25 @@ class StompProtocolHandler extends Proto
     rc.waiting_on = waiting_on()
     rc.messages_sent = messages_sent
     rc.messages_received = messages_received
+    if( debug ) {
+      import collection.JavaConversions._
+      val out = new StringBuilder
+      out.append("\n--- connection ---\n")
+      out.append("  { routing_size:"+routing_size+" }\n")
+      out.append("--- producers ---\n")
+      for( p <- producer_routes.values() ) {
+        out.append("  { "+p+" }\n")
+      }
+      out.append("--- consumers ---\n")
+      for( c <- consumers.values ) {
+        out.append("  { "+c+" }\n")
+      }
+      out.append("--- transactions ---\n")
+      for( t <- transactions.values ) {
+        out.append("  { "+t+" }\n")
+      }
+      rc.debug = out.toString()
+    }
     rc
   }
 
@@ -1221,6 +1265,10 @@ class StompProtocolHandler extends Proto
       }
       super.offer(delivery)
     }
+
+    override def toString = {
+      "addresses:"+key+", routing_items:"+routing_items+", "+super.toString
+    }
   }
 
 
@@ -1434,7 +1482,7 @@ class StompProtocolHandler extends Proto
       // but once it gets full.. suspend, so that we get more stomp messages
       // until it's not full anymore.
       route.suspended = true
-      suspend_read("blocked sending to: "+route.dispatch_sessions.mkString(", "))
+      suspend_read("blocked sending to: "+route.addresses.mkString(", "))
     }
     frame.release
   }
@@ -1718,6 +1766,11 @@ class StompProtocolHandler extends Proto
 
     val queue = ListBuffer[((StoreUOW)=>Unit, ()=>Unit)]()
 
+
+    override def toString: String = {
+      "{ actions: "+queue.size+" }"
+    }
+
     def add(on_commit:(StoreUOW)=>Unit, on_rollback:()=>Unit=null):Unit = {
       queue += ((on_commit, on_rollback))
     }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Wed May  8 19:36:06 2013
@@ -967,7 +967,7 @@ class BrokerResource() extends Resource 
       monitoring(broker) {
 
         val records = sync_all(broker.connections.values) { value =>
-          value.get_connection_status
+          value.get_connection_status(false)
         }
 
         val rc:FutureResult[DataPageDTO] = records.map(narrow(classOf[ConnectionStatusDTO], _, f, q, p, ps, o))
@@ -978,10 +978,10 @@ class BrokerResource() extends Resource 
 
   @GET @Path("/connections/{id}")
   @ApiOperation(value = "Gets that status of a connection.")
-  def connection(@PathParam("id") id : Long):ConnectionStatusDTO = {
+  def connection(@PathParam("id") id : Long, @QueryParam("debug") debug:Boolean):ConnectionStatusDTO = {
     with_connection(id){ connection=>
       monitoring(connection.connector.broker) {
-        connection.get_connection_status
+        connection.get_connection_status(debug)
       }
     }
   }
@@ -1027,10 +1027,11 @@ class BrokerResource() extends Resource 
   @GET
   @Path("/hawtdispatch/profile")
   @ApiOperation(value="Enables or disables profiling")
-  def hawtdispatch_profile(@QueryParam("enabled") enabled : Boolean) = ok {
+  def hawtdispatch_profile(@QueryParam("enabled") enabled : Boolean):String = {
     with_broker { broker =>
       admining(broker) {
         Dispatch.profile(enabled)
+        ""
       }
     }
   }